Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Base toss #2525

Merged
merged 11 commits into from
Sep 27, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2270,17 +2270,16 @@ bool MetaClient::checkShadowAccountFromCache(const std::string& account) const {
return false;
}

TermID MetaClient::getTermFromCache(GraphSpaceID spaceId, PartitionID partId) const {
static TermID notFound = -1;
StatusOr<TermID> MetaClient::getTermFromCache(GraphSpaceID spaceId, PartitionID partId) const {
folly::RWSpinLock::ReadHolder holder(localCacheLock_);
auto spaceInfo = localCache_.find(spaceId);
if (spaceInfo == localCache_.end()) {
return notFound;
return Status::Error("Term not found!");
}

auto termInfo = spaceInfo->second->termOfPartition_.find(partId);
if (termInfo == spaceInfo->second->termOfPartition_.end()) {
return notFound;
return Status::Error("Term not found!");
}

return termInfo->second;
Expand Down
12 changes: 10 additions & 2 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
DECLARE_int32(meta_client_retry_times);
DECLARE_int32(heartbeat_interval_secs);

namespace nebula {
namespace storage {
class MetaClientTestUpdater;
} // namespace storage
} // namespace nebula

namespace nebula {
namespace meta {

Expand Down Expand Up @@ -182,6 +188,8 @@ class MetaClient {
FRIEND_TEST(MetaClientTest, RetryUntilLimitTest);
FRIEND_TEST(MetaClientTest, RocksdbOptionsTest);
friend class KillQueryMetaWrapper;
FRIEND_TEST(ChainAddEdgesTest, AddEdgesLocalTest);
friend class storage::MetaClientTestUpdater;

public:
MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool,
Expand Down Expand Up @@ -555,9 +563,9 @@ class MetaClient {

bool authCheckFromCache(const std::string& account, const std::string& password) const;

bool checkShadowAccountFromCache(const std::string& account) const;
StatusOr<TermID> getTermFromCache(GraphSpaceID spaceId, PartitionID) const;

TermID getTermFromCache(GraphSpaceID spaceId, PartitionID) const;
bool checkShadowAccountFromCache(const std::string& account) const;

StatusOr<std::vector<HostAddr>> getStorageHosts() const;

Expand Down
16 changes: 10 additions & 6 deletions src/clients/storage/GraphStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> GraphStorageClient::ad
evb,
std::move(requests),
[=](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::AddEdgesRequest& r) {
return useToss ? client->future_addEdgesAtomic(r) : client->future_addEdges(r);
return useToss ? client->future_chainAddEdges(r) : client->future_addEdges(r);
panda-sheep marked this conversation as resolved.
Show resolved Hide resolved
});
}

Expand Down Expand Up @@ -405,7 +405,8 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> GraphStorageClient::updat
bool insertable,
std::vector<std::string> returnProps,
std::string condition,
folly::EventBase* evb) {
folly::EventBase* evb,
bool useExperimentalFeature) {
auto cbStatus = getIdFromEdgeKey(space);
if (!cbStatus.ok()) {
return folly::makeFuture<StatusOr<storage::cpp2::UpdateResponse>>(cbStatus.status());
Expand Down Expand Up @@ -443,10 +444,13 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> GraphStorageClient::updat
}
request.second = std::move(req);

return getResponse(evb,
std::move(request),
[](cpp2::GraphStorageServiceAsyncClient* client,
const cpp2::UpdateEdgeRequest& r) { return client->future_updateEdge(r); });
return getResponse(
evb,
std::move(request),
[=](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::UpdateEdgeRequest& r) {
return useExperimentalFeature ? client->future_chainUpdateEdge(r)
: client->future_updateEdge(r);
});
}

folly::Future<StatusOr<cpp2::GetUUIDResp>> GraphStorageClient::getUUID(GraphSpaceID space,
Expand Down
3 changes: 2 additions & 1 deletion src/clients/storage/GraphStorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ class GraphStorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsy
bool insertable,
std::vector<std::string> returnProps,
std::string condition,
folly::EventBase* evb = nullptr);
folly::EventBase* evb = nullptr,
bool useExperimentalFeature = false);

folly::Future<StatusOr<cpp2::GetUUIDResp>> getUUID(GraphSpaceID space,
const std::string& name,
Expand Down
169 changes: 78 additions & 91 deletions src/clients/storage/InternalStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace nebula {
namespace storage {

template <typename T>
nebula::cpp2::ErrorCode extractErrorCode(T& tryResp) {
::nebula::cpp2::ErrorCode getErrorCode(T& tryResp) {
if (!tryResp.hasValue()) {
LOG(ERROR) << tryResp.exception().what();
return nebula::cpp2::ErrorCode::E_RPC_FAILURE;
Expand All @@ -37,112 +37,99 @@ nebula::cpp2::ErrorCode extractErrorCode(T& tryResp) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

StatusOr<HostAddr> InternalStorageClient::getFuzzyLeader(GraphSpaceID spaceId,
PartitionID partId) const {
return getLeader(spaceId, partId);
}

folly::SemiFuture<nebula::cpp2::ErrorCode> InternalStorageClient::forwardTransaction(
int64_t txnId,
GraphSpaceID spaceId,
PartitionID partId,
std::string&& data,
folly::EventBase* evb) {
auto c = folly::makePromiseContract<nebula::cpp2::ErrorCode>();
forwardTransactionImpl(txnId, spaceId, partId, std::move(data), std::move(c.first), evb);
return std::move(c.second);
}

void InternalStorageClient::forwardTransactionImpl(int64_t txnId,
GraphSpaceID spaceId,
PartitionID partId,
std::string&& data,
folly::Promise<nebula::cpp2::ErrorCode> p,
folly::EventBase* evb) {
VLOG(1) << "forwardTransactionImpl txnId=" << txnId;
auto statusOrLeader = getFuzzyLeader(spaceId, partId);
if (!statusOrLeader.ok()) {
p.setValue(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND);
void InternalStorageClient::chainUpdateEdge(cpp2::UpdateEdgeRequest& reversedRequest,
TermID termOfSrc,
folly::Optional<int64_t> optVersion,
folly::Promise<::nebula::cpp2::ErrorCode>&& p,
folly::EventBase* evb) {
auto spaceId = reversedRequest.get_space_id();
auto partId = reversedRequest.get_part_id();
auto optLeader = getLeader(spaceId, partId);
if (!optLeader.ok()) {
LOG(WARNING) << folly::sformat("failed to get leader, space {}, part {}. ", spaceId, partId)
<< optLeader.status();
p.setValue(::nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND);
return;
}
HostAddr& dest = statusOrLeader.value();
dest.port += kInternalPortOffset;

cpp2::InternalTxnRequest interReq;
interReq.set_txn_id(txnId);
interReq.set_space_id(spaceId);
interReq.set_part_id(partId);
interReq.set_position(1);
(*interReq.data_ref()).resize(2);
(*interReq.data_ref()).back().emplace_back(data);
getResponse(
HostAddr& leader = optLeader.value();
leader.port += kInternalPortOffset;
VLOG(1) << "leader host: " << leader;

cpp2::ChainUpdateEdgeRequest chainReq;
chainReq.set_update_edge_request(reversedRequest);
chainReq.set_term(termOfSrc);
if (optVersion) {
chainReq.set_edge_version(optVersion.value());
}
auto resp = getResponse(
evb,
std::make_pair(dest, interReq),
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::InternalTxnRequest& r) {
return client->future_forwardTransaction(r);
})
.thenTry([=, p = std::move(p)](auto&& t) mutable {
auto code = extractErrorCode(t);
if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
return forwardTransactionImpl(txnId, spaceId, partId, std::move(data), std::move(p), evb);
} else {
p.setValue(code);
}
std::make_pair(leader, chainReq),
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainUpdateEdgeRequest& r) {
return client->future_chainUpdateEdge(r);
});
}

folly::SemiFuture<ErrOrVal> InternalStorageClient::getValue(size_t vIdLen,
GraphSpaceID spaceId,
folly::StringPiece key,
folly::EventBase* evb) {
auto srcVid = key.subpiece(sizeof(PartitionID), vIdLen);
auto partId = metaClient_->partId(spaceId, srcVid.str());

auto c = folly::makePromiseContract<ErrOrVal>();
getValueImpl(spaceId, partId, key.str(), std::move(c.first), evb);
return std::move(c.second);
}

void InternalStorageClient::getValueImpl(GraphSpaceID spaceId,
PartitionID partId,
std::string&& key,
folly::Promise<ErrOrVal> p,
folly::EventBase* evb) {
std::pair<HostAddr, cpp2::GetValueRequest> req;
auto stLeaderHost = getFuzzyLeader(spaceId, partId);
if (!stLeaderHost.ok()) {
if (stLeaderHost.status().toString().find("partid")) {
p.setValue(nebula::cpp2::ErrorCode::E_PART_NOT_FOUND);
std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
auto code = getErrorCode(t);
if (code == ::nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
chainUpdateEdge(reversedRequest, termOfSrc, optVersion, std::move(p));
} else {
p.setValue(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND);
p.setValue(code);
}
return;
}
req.first = stLeaderHost.value();
req.first.port += kInternalPortOffset;
});
}

req.second.set_space_id(spaceId);
req.second.set_part_id(partId);
req.second.set_key(key);
void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq,
TermID termId,
folly::Optional<int64_t> optVersion,
folly::Promise<nebula::cpp2::ErrorCode>&& p,
folly::EventBase* evb) {
auto spaceId = directReq.get_space_id();
auto partId = directReq.get_parts().begin()->first;
auto optLeader = getLeader(directReq.get_space_id(), partId);
if (!optLeader.ok()) {
LOG(WARNING) << folly::sformat("failed to get leader, space {}, part {}", spaceId, partId);
p.setValue(::nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND);
return;
}
HostAddr& leader = optLeader.value();
leader.port += kInternalPortOffset;
VLOG(1) << "leader host: " << leader;

auto remote = [](cpp2::InternalStorageServiceAsyncClient* client,
const cpp2::GetValueRequest& r) { return client->future_getValue(r); };
cpp2::ChainAddEdgesRequest chainReq = makeChainAddReq(directReq, termId, optVersion);
auto resp = getResponse(
evb,
std::make_pair(leader, chainReq),
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainAddEdgesRequest& r) {
return client->future_chainAddEdges(r);
});

auto cb = [=, p = std::move(p)](auto&& t) mutable {
auto code = extractErrorCode(t);
if (code == nebula::cpp2::ErrorCode::SUCCEEDED) {
p.setValue(t.value().value().get_value());
} else if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
// retry directly may easily get same error
std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
auto code = getErrorCode(t);
if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
return getValueImpl(spaceId, partId, std::move(key), std::move(p), evb);
chainAddEdges(directReq, termId, optVersion, std::move(p));
} else {
p.setValue(code);
}
};
return;
});
}

getResponse(evb, std::move(req), remote).thenTry(std::move(cb));
cpp2::ChainAddEdgesRequest InternalStorageClient::makeChainAddReq(const cpp2::AddEdgesRequest& req,
TermID termId,
folly::Optional<int64_t> ver) {
cpp2::ChainAddEdgesRequest ret;
ret.set_space_id(req.get_space_id());
ret.set_parts(req.get_parts());
ret.set_prop_names(req.get_prop_names());
ret.set_if_not_exists(req.get_if_not_exists());
ret.set_term(termId);
if (ver) {
ret.set_edge_version(ver.value());
}
return ret;
}

} // namespace storage
Expand Down
44 changes: 16 additions & 28 deletions src/clients/storage/InternalStorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
namespace nebula {
namespace storage {

typedef ErrorOr<nebula::cpp2::ErrorCode, std::string> ErrOrVal;

/**
* A wrapper class for InternalStorageServiceAsyncClient thrift API
*
Expand All @@ -33,32 +31,22 @@ class InternalStorageClient : public StorageClientBase<cpp2::InternalStorageServ
: Parent(ioThreadPool, metaClient) {}
virtual ~InternalStorageClient() = default;

folly::SemiFuture<nebula::cpp2::ErrorCode> forwardTransaction(int64_t txnId,
GraphSpaceID spaceId,
PartitionID partId,
std::string&& data,
folly::EventBase* evb = nullptr);

folly::SemiFuture<ErrOrVal> getValue(size_t vIdLen,
GraphSpaceID spaceId,
folly::StringPiece key,
folly::EventBase* evb = nullptr);

protected:
StatusOr<HostAddr> getFuzzyLeader(GraphSpaceID spaceId, PartitionID partId) const;

void forwardTransactionImpl(int64_t txnId,
GraphSpaceID spaceId,
PartitionID partId,
std::string&& data,
folly::Promise<nebula::cpp2::ErrorCode> p,
folly::EventBase* evb);

void getValueImpl(GraphSpaceID spaceId,
PartitionID partId,
std::string&& key,
folly::Promise<ErrOrVal> p,
folly::EventBase* evb = nullptr);
virtual void chainUpdateEdge(cpp2::UpdateEdgeRequest& reversedRequest,
TermID termOfSrc,
folly::Optional<int64_t> optVersion,
folly::Promise<::nebula::cpp2::ErrorCode>&& p,
folly::EventBase* evb = nullptr);

virtual void chainAddEdges(cpp2::AddEdgesRequest& req,
TermID termId,
folly::Optional<int64_t> optVersion,
folly::Promise<::nebula::cpp2::ErrorCode>&& p,
folly::EventBase* evb = nullptr);

private:
cpp2::ChainAddEdgesRequest makeChainAddReq(const cpp2::AddEdgesRequest& req,
TermID termId,
folly::Optional<int64_t> optVersion);
};

} // namespace storage
Expand Down
8 changes: 0 additions & 8 deletions src/clients/storage/StorageClientBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,6 @@ class StorageClientBase {
return {req.get_part_id()};
}

std::vector<PartitionID> getReqPartsId(const cpp2::InternalTxnRequest& req) const {
return {req.get_part_id()};
}

std::vector<PartitionID> getReqPartsId(const cpp2::GetValueRequest& req) const {
return {req.get_part_id()};
}

std::vector<PartitionID> getReqPartsId(const cpp2::ScanEdgeRequest& req) const {
return {req.get_part_id()};
}
Expand Down
7 changes: 7 additions & 0 deletions src/common/utils/MemoryLockWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ class MemoryLockGuard {
return *iter_;
}

// this will manual set the lock to unlocked state
// which mean will not release all locks automaticly
// please make sure you really know the side effect
void forceLock() { locked_ = true; }

void forceUnlock() { locked_ = false; }

protected:
MemoryLockCore<Key>* lock_;
std::vector<Key> keys_;
Expand Down
Loading