Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

To support delete in toss. #3374

Merged
merged 12 commits into from
Dec 29, 2021
45 changes: 44 additions & 1 deletion src/clients/storage/InternalStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq,
auto partId = directReq.get_parts().begin()->first;
auto optLeader = getLeader(directReq.get_space_id(), partId);
if (!optLeader.ok()) {
LOG(WARNING) << folly::sformat("failed to get leader, space {}, part {}", spaceId, partId);
LOG(WARNING) << folly::sformat("failed to get leader, space {}, part {}", spaceId, partId)
<< optLeader.status();
p.setValue(::nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND);
return;
}
Expand Down Expand Up @@ -131,5 +132,47 @@ cpp2::ChainAddEdgesRequest InternalStorageClient::makeChainAddReq(const cpp2::Ad
return ret;
}

void InternalStorageClient::chainDeleteEdges(cpp2::DeleteEdgesRequest& req,
const std::string& txnId,
TermID termId,
folly::Promise<nebula::cpp2::ErrorCode>&& p,
folly::EventBase* evb) {
auto spaceId = req.get_space_id();
auto partId = req.get_parts().begin()->first;
auto optLeader = getLeader(req.get_space_id(), partId);
if (!optLeader.ok()) {
LOG(WARNING) << folly::sformat("failed to get leader, space {}, part {}", spaceId, partId)
<< optLeader.status();
p.setValue(::nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND);
panda-sheep marked this conversation as resolved.
Show resolved Hide resolved
return;
}
HostAddr& leader = optLeader.value();
leader.port += kInternalPortOffset;
VLOG(2) << "leader host: " << leader;

cpp2::ChainDeleteEdgesRequest chainReq;
chainReq.space_id_ref() = req.get_space_id();
chainReq.parts_ref() = req.get_parts();
chainReq.txn_id_ref() = txnId;
chainReq.term_ref() = termId;
auto resp = getResponse(
evb,
std::make_pair(leader, chainReq),
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainDeleteEdgesRequest& r) {
return client->future_chainDeleteEdges(r);
});

std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
auto code = getErrorCode(t);
if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
chainDeleteEdges(req, txnId, termId, std::move(p));
} else {
p.setValue(code);
}
return;
});
}

} // namespace storage
} // namespace nebula
6 changes: 6 additions & 0 deletions src/clients/storage/InternalStorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ class InternalStorageClient
folly::Promise<::nebula::cpp2::ErrorCode>&& p,
folly::EventBase* evb = nullptr);

virtual void chainDeleteEdges(cpp2::DeleteEdgesRequest& req,
const std::string& txnId,
TermID termId,
folly::Promise<::nebula::cpp2::ErrorCode>&& p,
folly::EventBase* evb = nullptr);

private:
cpp2::ChainAddEdgesRequest makeChainAddReq(const cpp2::AddEdgesRequest& req,
TermID termId,
Expand Down
6 changes: 4 additions & 2 deletions src/clients/storage/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,10 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::deleteEdges(

return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::DeleteEdgesRequest& r) {
return client->future_deleteEdges(r);
[useToss = param.useExperimentalFeature](
ThriftClientType* client, const cpp2::DeleteEdgesRequest& r) {
return useToss ? client->future_chainDeleteEdges(r)
: client->future_deleteEdges(r);
});
}

Expand Down
2 changes: 2 additions & 0 deletions src/common/utils/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ enum class NebulaKeyType : uint32_t {
kOperation = 0x00000005,
kKeyValue = 0x00000006,
kVertex = 0x00000007,
kPrime = 0x00000008, // used in TOSS, if we write a lock succeed
kDoublePrime = 0x00000009, // used in TOSS, if we get RPC back from remote.
};

enum class NebulaSystemKeyType : uint32_t {
Expand Down
2 changes: 2 additions & 0 deletions src/graph/executor/mutate/DeleteExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "graph/context/QueryContext.h"
#include "graph/executor/mutate/DeleteExecutor.h"
#include "graph/planner/plan/Mutate.h"
#include "graph/service/GraphFlags.h"
#include "graph/util/SchemaUtil.h"

using nebula::storage::StorageClient;
Expand Down Expand Up @@ -208,6 +209,7 @@ folly::Future<Status> DeleteEdgesExecutor::deleteEdges() {
auto plan = qctx()->plan();
StorageClient::CommonRequestParam param(
spaceId, qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
param.useExperimentalFeature = FLAGS_enable_experimental_feature;
return qctx()
->getStorageClient()
->deleteEdges(param, std::move(edgeKeys))
Expand Down
23 changes: 11 additions & 12 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ service GraphStorageService {

UpdateResponse chainUpdateEdge(1: UpdateEdgeRequest req);
ExecResponse chainAddEdges(1: AddEdgesRequest req);
ExecResponse chainDeleteEdges(1: DeleteEdgesRequest req);

KVGetResponse get(1: KVGetRequest req);
ExecResponse put(1: KVPutRequest req);
Expand Down Expand Up @@ -854,17 +855,6 @@ service StorageAdminService {
//
//////////////////////////////////////////////////////////

// transaction request
struct InternalTxnRequest {
1: i64 txn_id,
2: map<common.PartitionID, i64> term_of_parts,
3: optional AddEdgesRequest add_edge_req,
4: optional UpdateEdgeRequest upd_edge_req,
5: optional map<common.PartitionID, list<i64>>(
cpp.template = "std::unordered_map") edge_ver,
}


struct ChainAddEdgesRequest {
1: common.GraphSpaceID space_id,
// partId => edges
Expand All @@ -875,7 +865,6 @@ struct ChainAddEdgesRequest {
3: list<binary> prop_names,
// if true, when edge already exists, do nothing
4: bool if_not_exists,
// 5: map<common.PartitionID, i64> term_of_parts,
5: i64 term
6: optional i64 edge_version
// 6: optional map<common.PartitionID, list<i64>>(
Expand All @@ -891,7 +880,17 @@ struct ChainUpdateEdgeRequest {
5: required list<common.PartitionID> parts,
}

struct ChainDeleteEdgesRequest {
1: common.GraphSpaceID space_id,
// partId => edgeKeys
2: map<common.PartitionID, list<EdgeKey>>
(cpp.template = "std::unordered_map") parts,
3: binary txn_id
4: i64 term,
}

service InternalStorageService {
ExecResponse chainAddEdges(1: ChainAddEdgesRequest req);
UpdateResponse chainUpdateEdge(1: ChainUpdateEdgeRequest req);
ExecResponse chainDeleteEdges(1: ChainDeleteEdgesRequest req);
}
4 changes: 2 additions & 2 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> Part::commitLogs(
// Make the number of values are an even number
DCHECK_EQ((kvs.size() + 1) / 2, kvs.size() / 2);
for (size_t i = 0; i < kvs.size(); i += 2) {
VLOG(1) << "OP_MULTI_PUT " << folly::hexlify(kvs[i])
VLOG(2) << "OP_MULTI_PUT " << folly::hexlify(kvs[i])
<< ", val = " << folly::hexlify(kvs[i + 1]);
auto code = batch->put(kvs[i], kvs[i + 1]);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand Down Expand Up @@ -295,7 +295,7 @@ std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> Part::commitLogs(
case OP_BATCH_WRITE: {
auto data = decodeBatchValue(log);
for (auto& op : data) {
VLOG(1) << "OP_BATCH_WRITE: " << folly::hexlify(op.second.first)
VLOG(2) << "OP_BATCH_WRITE: " << folly::hexlify(op.second.first)
<< ", val=" << folly::hexlify(op.second.second);
auto code = nebula::cpp2::ErrorCode::SUCCEEDED;
if (op.first == BatchLogType::OP_BATCH_PUT) {
Expand Down
1 change: 0 additions & 1 deletion src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2026,7 +2026,6 @@ void RaftPart::checkRemoteListeners(const std::set<HostAddr>& expected) {
}
}
}

bool RaftPart::leaseValid() {
std::lock_guard<std::mutex> g(raftLock_);
if (hosts_.empty()) {
Expand Down
12 changes: 8 additions & 4 deletions src/mock/MockData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ std::vector<VertexID> MockData::mockPlayerVerticeIds() {
return ret;
}

std::vector<EdgeData> MockData::mockEdges(bool upper) {
std::vector<EdgeData> MockData::mockEdges(bool upper, bool hasInEdges) {
std::vector<EdgeData> ret;
// Use serve data, positive edgeType is 101, reverse edgeType is -101
for (auto& serve : serves_) {
Expand Down Expand Up @@ -788,7 +788,9 @@ std::vector<EdgeData> MockData::mockEdges(bool upper) {
positiveEdge.props_ = std::move(props);
auto reverseData = getReverseEdge(positiveEdge);
ret.emplace_back(std::move(positiveEdge));
ret.emplace_back(std::move(reverseData));
if (hasInEdges) {
ret.emplace_back(std::move(reverseData));
}
}
return ret;
}
Expand Down Expand Up @@ -947,11 +949,13 @@ nebula::storage::cpp2::DeleteVerticesRequest MockData::mockDeleteVerticesReq(int
return req;
}

nebula::storage::cpp2::AddEdgesRequest MockData::mockAddEdgesReq(bool upper, int32_t parts) {
nebula::storage::cpp2::AddEdgesRequest MockData::mockAddEdgesReq(bool upper,
int32_t parts,
bool hasInEdges) {
nebula::storage::cpp2::AddEdgesRequest req;
req.space_id_ref() = 1;
req.if_not_exists_ref() = true;
auto retRecs = mockEdges(upper);
auto retRecs = mockEdges(upper, hasInEdges);
for (auto& rec : retRecs) {
nebula::storage::cpp2::NewEdge newEdge;
nebula::storage::cpp2::EdgeKey edgeKey;
Expand Down
6 changes: 4 additions & 2 deletions src/mock/MockData.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ class MockData {
static std::vector<std::pair<PartitionID, std::string>> mockPlayerIndexKeys(bool upper = false);

// generate serve edge
static std::vector<EdgeData> mockEdges(bool upper = false);
// param: includeInEdges, if the return set has both out and in edges
static std::vector<EdgeData> mockEdges(bool upper = false, bool includeInEdges = true);

static std::vector<std::pair<PartitionID, std::string>> mockServeIndexKeys();

Expand Down Expand Up @@ -169,7 +170,8 @@ class MockData {
int32_t parts = 6);

static nebula::storage::cpp2::AddEdgesRequest mockAddEdgesReq(bool upper = false,
int32_t parts = 6);
int32_t parts = 6,
bool hasInEdges = true);

static nebula::storage::cpp2::DeleteVerticesRequest mockDeleteVerticesReq(int32_t parts = 6);

Expand Down
13 changes: 9 additions & 4 deletions src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,22 @@ nebula_add_library(
storage_transaction_executor OBJECT
transaction/TransactionManager.cpp
transaction/ConsistUtil.cpp
transaction/ChainUpdateEdgeProcessorLocal.cpp
transaction/ChainUpdateEdgeProcessorRemote.cpp
transaction/ChainUpdateEdgeLocalProcessor.cpp
transaction/ChainUpdateEdgeRemoteProcessor.cpp
transaction/ChainResumeProcessor.cpp
transaction/ChainAddEdgesGroupProcessor.cpp
transaction/ChainAddEdgesProcessorLocal.cpp
transaction/ChainAddEdgesProcessorRemote.cpp
transaction/ChainAddEdgesLocalProcessor.cpp
transaction/ChainAddEdgesRemoteProcessor.cpp
transaction/ResumeAddEdgeProcessor.cpp
transaction/ResumeAddEdgeRemoteProcessor.cpp
transaction/ResumeUpdateProcessor.cpp
transaction/ResumeUpdateRemoteProcessor.cpp
transaction/ChainProcessorFactory.cpp
transaction/ChainDeleteEdgesGroupProcessor.cpp
transaction/ChainDeleteEdgesLocalProcessor.cpp
transaction/ChainDeleteEdgesRemoteProcessor.cpp
transaction/ChainDeleteEdgesResumeProcessor.cpp
transaction/ChainDeleteEdgesResumeRemoteProcessor.cpp
)

nebula_add_library(
Expand Down
11 changes: 9 additions & 2 deletions src/storage/GraphStorageServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
#include "storage/query/ScanEdgeProcessor.h"
#include "storage/query/ScanVertexProcessor.h"
#include "storage/transaction/ChainAddEdgesGroupProcessor.h"
#include "storage/transaction/ChainUpdateEdgeProcessorLocal.h"
#include "storage/transaction/ChainDeleteEdgesGroupProcessor.h"
#include "storage/transaction/ChainUpdateEdgeLocalProcessor.h"

#define RETURN_FUTURE(processor) \
auto f = processor->getFuture(); \
Expand Down Expand Up @@ -112,7 +113,7 @@ folly::Future<cpp2::UpdateResponse> GraphStorageServiceHandler::future_updateEdg

folly::Future<cpp2::UpdateResponse> GraphStorageServiceHandler::future_chainUpdateEdge(
const cpp2::UpdateEdgeRequest& req) {
auto* proc = ChainUpdateEdgeProcessorLocal::instance(env_);
auto* proc = ChainUpdateEdgeLocalProcessor::instance(env_);
RETURN_FUTURE(proc);
}

Expand Down Expand Up @@ -160,6 +161,12 @@ folly::Future<cpp2::ExecResponse> GraphStorageServiceHandler::future_chainAddEdg
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResponse> GraphStorageServiceHandler::future_chainDeleteEdges(
const cpp2::DeleteEdgesRequest& req) {
auto* processor = ChainDeleteEdgesGroupProcessor::instance(env_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResponse> GraphStorageServiceHandler::future_put(
const cpp2::KVPutRequest& req) {
auto* processor = PutProcessor::instance(env_);
Expand Down
3 changes: 3 additions & 0 deletions src/storage/GraphStorageServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class GraphStorageServiceHandler final : public cpp2::GraphStorageServiceSvIf {

folly::Future<cpp2::ScanResponse> future_scanVertex(const cpp2::ScanVertexRequest& req) override;

folly::Future<cpp2::ExecResponse> future_chainDeleteEdges(
const cpp2::DeleteEdgesRequest& req) override;

folly::Future<cpp2::ScanResponse> future_scanEdge(const cpp2::ScanEdgeRequest& req) override;

folly::Future<cpp2::GetUUIDResp> future_getUUID(const cpp2::GetUUIDReq& req) override;
Expand Down
15 changes: 11 additions & 4 deletions src/storage/InternalStorageServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

#include "storage/InternalStorageServiceHandler.h"

#include "storage/transaction/ChainAddEdgesProcessorRemote.h"
#include "storage/transaction/ChainUpdateEdgeProcessorRemote.h"
#include "storage/transaction/ChainAddEdgesRemoteProcessor.h"
#include "storage/transaction/ChainDeleteEdgesRemoteProcessor.h"
#include "storage/transaction/ChainUpdateEdgeRemoteProcessor.h"

#define RETURN_FUTURE(processor) \
auto f = processor->getFuture(); \
Expand All @@ -20,13 +21,19 @@ InternalStorageServiceHandler::InternalStorageServiceHandler(StorageEnv* env) :

folly::Future<cpp2::ExecResponse> InternalStorageServiceHandler::future_chainAddEdges(
const cpp2::ChainAddEdgesRequest& req) {
auto* processor = ChainAddEdgesProcessorRemote::instance(env_);
auto* processor = ChainAddEdgesRemoteProcessor::instance(env_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::UpdateResponse> InternalStorageServiceHandler::future_chainUpdateEdge(
const cpp2::ChainUpdateEdgeRequest& req) {
auto* processor = ChainUpdateEdgeProcessorRemote::instance(env_);
auto* processor = ChainUpdateEdgeRemoteProcessor::instance(env_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResponse> InternalStorageServiceHandler::future_chainDeleteEdges(
const cpp2::ChainDeleteEdgesRequest& req) {
auto* processor = ChainDeleteEdgesRemoteProcessor::instance(env_);
RETURN_FUTURE(processor);
}

Expand Down
3 changes: 3 additions & 0 deletions src/storage/InternalStorageServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ class InternalStorageServiceHandler final : public cpp2::InternalStorageServiceS
folly::Future<cpp2::UpdateResponse> future_chainUpdateEdge(
const cpp2::ChainUpdateEdgeRequest& p_req);

folly::Future<cpp2::ExecResponse> future_chainDeleteEdges(
const cpp2::ChainDeleteEdgesRequest& p_req);

private:
StorageEnv* env_{nullptr};
};
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mutate/AddEdgesProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ extern ProcessorCounters kAddEdgesCounters;

class AddEdgesProcessor : public BaseProcessor<cpp2::ExecResponse> {
friend class TransactionManager;
friend class ChainAddEdgesProcessorLocal;
friend class ChainAddEdgesLocalProcessor;

public:
static AddEdgesProcessor* instance(StorageEnv* env,
Expand Down
Loading