Skip to content

Commit

Permalink
add comments & accumulate bug fix for TOSS (#3643) (#3833)
Browse files Browse the repository at this point in the history
1. add comments & adjust some log level
2. accumulate bug fix for TOSS
  2.1 one space may be affected by another space. when recover prime edges.
  2.2 when call kvstore->get(), if leaderLeaseInvalid, it will report leader_change, change it to
      leader_lease_failed.
  2.3 print all execution trace in ChainProcessor::finish(), make much more grep friendly.
  2.4 some function of memory lock is hard to understand (forceLock, forceUnlock) , change to
      "setAutoUnlock(bool)"
  • Loading branch information
liuyu85cn authored Jan 27, 2022
1 parent aeea6ba commit 782ca6a
Show file tree
Hide file tree
Showing 49 changed files with 1,262 additions and 1,210 deletions.
6 changes: 3 additions & 3 deletions src/clients/storage/InternalStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ ::nebula::cpp2::ErrorCode getErrorCode(T& tryResp) {
switch (stResp.status().code()) {
case Status::Code::kLeaderChanged:
return nebula::cpp2::ErrorCode::E_LEADER_CHANGED;
case Status::Code::kError:
return nebula::cpp2::ErrorCode::E_RPC_FAILURE;
default:
LOG(ERROR) << "not impl error transform: code="
<< static_cast<int32_t>(stResp.status().code());
Expand Down Expand Up @@ -69,8 +71,8 @@ void InternalStorageClient::chainUpdateEdge(cpp2::UpdateEdgeRequest& reversedReq

std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
auto code = getErrorCode(t);
VLOG(1) << "chainUpdateEdge rpc: " << apache::thrift::util::enumNameSafe(code);
if (code == ::nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
chainUpdateEdge(reversedRequest, termOfSrc, optVersion, std::move(p));
} else {
p.setValue(code);
Expand Down Expand Up @@ -108,7 +110,6 @@ void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq,
std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
auto code = getErrorCode(t);
if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
chainAddEdges(directReq, termId, optVersion, std::move(p));
} else {
p.setValue(code);
Expand Down Expand Up @@ -165,7 +166,6 @@ void InternalStorageClient::chainDeleteEdges(cpp2::DeleteEdgesRequest& req,
std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
auto code = getErrorCode(t);
if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
chainDeleteEdges(req, txnId, termId, std::move(p));
} else {
p.setValue(code);
Expand Down
14 changes: 4 additions & 10 deletions src/common/utils/MemoryLockWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class MemoryLockGuard {
}

~MemoryLockGuard() {
if (locked_) {
if (locked_ && autoUnlock_) {
lock_->unlockBatch(keys_);
}
}
Expand All @@ -71,22 +71,16 @@ class MemoryLockGuard {
return *iter_;
}

// this will manual set the lock to unlocked state
// which mean will not release all locks automatically
// please make sure you really know the side effect
void forceLock() {
locked_ = true;
}

void forceUnlock() {
locked_ = false;
void setAutoUnlock(bool autoUnlock) {
autoUnlock_ = autoUnlock;
}

protected:
MemoryLockCore<Key>* lock_;
std::vector<Key> keys_;
typename std::vector<Key>::iterator iter_;
bool locked_{false};
bool autoUnlock_{true};
};

} // namespace nebula
1 change: 1 addition & 0 deletions src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ enum ErrorCode {
E_RAFT_WRITE_BLOCKED = -3528,
E_RAFT_BUFFER_OVERFLOW = -3529,
E_RAFT_ATOMIC_OP_FAILED = -3530,
E_LEADER_LEASE_FAILED = -3531,

E_UNKNOWN = -8000,
} (cpp.enum_strict)
3 changes: 2 additions & 1 deletion src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,8 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId,
}
auto part = nebula::value(ret);
if (!checkLeader(part, canReadFromFollower)) {
return nebula::cpp2::ErrorCode::E_LEADER_CHANGED;
return part->isLeader() ? nebula::cpp2::ErrorCode::E_LEADER_LEASE_FAILED
: nebula::cpp2::ErrorCode::E_LEADER_CHANGED;
}
return part->engine()->get(key, value);
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta/upgrade/v2/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ struct SpaceDesc {
3: i32 replica_factor = 0,
4: binary charset_name,
5: binary collate_name,
6: ColumnTypeDef vid_type = {"type": PropertyType.FIXED_STRING, "type_length": 8},
6: ColumnTypeDef vid_type = {"type": "PropertyType.FIXED_STRING", "type_length": 8},
7: optional binary group_name,
8: optional IsolationLevel isolation_level,
9: optional binary comment,
Expand Down Expand Up @@ -78,4 +78,4 @@ struct ColumnTypeDef {
enum IsolationLevel {
DEFAULT = 0x00, // allow add half edge(either in or out edge succeeded)
TOSS = 0x01, // add in and out edge atomic
} (cpp.enum_strict)
} (cpp.enum_strict)
1 change: 1 addition & 0 deletions src/mock/MockCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ void MockCluster::initStorageKV(const char* dataPath,

txnMan_ = std::make_unique<storage::TransactionManager>(storageEnv_.get());
storageEnv_->txnMan_ = txnMan_.get();
txnMan_->start();
}

void MockCluster::startStorage(HostAddr addr,
Expand Down
9 changes: 4 additions & 5 deletions src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,13 @@ nebula_add_library(
transaction/ConsistUtil.cpp
transaction/ChainUpdateEdgeLocalProcessor.cpp
transaction/ChainUpdateEdgeRemoteProcessor.cpp
transaction/ChainResumeProcessor.cpp
transaction/ChainAddEdgesGroupProcessor.cpp
transaction/ChainAddEdgesLocalProcessor.cpp
transaction/ChainAddEdgesRemoteProcessor.cpp
transaction/ResumeAddEdgeProcessor.cpp
transaction/ResumeAddEdgeRemoteProcessor.cpp
transaction/ResumeUpdateProcessor.cpp
transaction/ResumeUpdateRemoteProcessor.cpp
transaction/ChainResumeAddPrimeProcessor.cpp
transaction/ChainResumeAddDoublePrimeProcessor.cpp
transaction/ChainResumeUpdatePrimeProcessor.cpp
transaction/ChainResumeUpdateDoublePrimeProcessor.cpp
transaction/ChainProcessorFactory.cpp
transaction/ChainDeleteEdgesGroupProcessor.cpp
transaction/ChainDeleteEdgesLocalProcessor.cpp
Expand Down
7 changes: 4 additions & 3 deletions src/storage/InternalStorageServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ class InternalStorageServiceHandler final : public cpp2::InternalStorageServiceS
public:
explicit InternalStorageServiceHandler(StorageEnv* env);

folly::Future<cpp2::ExecResponse> future_chainAddEdges(const cpp2::ChainAddEdgesRequest& p_req);
folly::Future<cpp2::ExecResponse> future_chainAddEdges(
const cpp2::ChainAddEdgesRequest& p_req) override;

folly::Future<cpp2::UpdateResponse> future_chainUpdateEdge(
const cpp2::ChainUpdateEdgeRequest& p_req);
const cpp2::ChainUpdateEdgeRequest& p_req) override;

folly::Future<cpp2::ExecResponse> future_chainDeleteEdges(
const cpp2::ChainDeleteEdgesRequest& p_req);
const cpp2::ChainDeleteEdgesRequest& p_req) override;

private:
StorageEnv* env_{nullptr};
Expand Down
1 change: 1 addition & 0 deletions src/storage/StorageServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ void StorageServer::stop() {

if (txnMan_) {
txnMan_->stop();
txnMan_->join();
}
if (taskMgr_) {
taskMgr_->shutdown();
Expand Down
3 changes: 3 additions & 0 deletions src/storage/index/LookupProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class LookupProcessor : public BaseProcessor<cpp2::LookupIndexResp> {
folly::Executor* executor_{nullptr};
std::unique_ptr<PlanContext> planContext_;
std::unique_ptr<RuntimeContext> context_;
/**
* @brief the final output
*/
nebula::DataSet resultDataSet_;
std::vector<nebula::DataSet> partResults_;
};
Expand Down
3 changes: 3 additions & 0 deletions src/storage/kv/GetProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ namespace storage {

extern ProcessorCounters kGetCounters;

/**
* @brief this is a simple get() interface when storage run in KV mode.
*/
class GetProcessor : public BaseProcessor<cpp2::KVGetResponse> {
public:
static GetProcessor* instance(StorageEnv* env,
Expand Down
4 changes: 3 additions & 1 deletion src/storage/kv/PutProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ namespace nebula {
namespace storage {

extern ProcessorCounters kPutCounters;

/**
* @brief this is a simple put() interface when storage run in KV mode.
*/
class PutProcessor : public BaseProcessor<cpp2::ExecResponse> {
public:
static PutProcessor* instance(StorageEnv* env,
Expand Down
3 changes: 3 additions & 0 deletions src/storage/kv/RemoveProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ namespace storage {

extern ProcessorCounters kRemoveCounters;

/**
* @brief this is a simple remove() interface when storage run in KV mode.
*/
class RemoveProcessor : public BaseProcessor<cpp2::ExecResponse> {
public:
static RemoveProcessor* instance(StorageEnv* env,
Expand Down
15 changes: 0 additions & 15 deletions src/storage/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -753,21 +753,6 @@ nebula_add_executable(
gtest
)

nebula_add_executable(
NAME
chain_resume_edge_test
SOURCES
ChainResumeEdgeTest.cpp
OBJECTS
${storage_test_deps}
LIBRARIES
${ROCKSDB_LIBRARIES}
${THRIFT_LIBRARIES}
${PROXYGEN_LIBRARIES}
wangle
gtest
)

nebula_add_executable(
NAME
storage_index_write_bm
Expand Down
48 changes: 15 additions & 33 deletions src/storage/test/ChainAddEdgesTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace storage {
constexpr int32_t mockSpaceId = 1;
constexpr int32_t mockPartNum = 1;
constexpr int32_t fackTerm = 1;
constexpr auto suc = nebula::cpp2::ErrorCode::SUCCEEDED;

// make sure test class works well
TEST(ChainAddEdgesTest, TestUtilsTest) {
Expand All @@ -38,23 +39,23 @@ TEST(ChainAddEdgesTest, TestUtilsTest) {
env->metaClient_ = mClient.get();
MetaClientTestUpdater::addPartTerm(env->metaClient_, mockSpaceId, mockPartNum, fackTerm);

auto* processor = new FakeChainAddEdgesLocalProcessor(env);
auto* proc = new FakeChainAddEdgesLocalProcessor(env);

processor->rcPrepareLocal = nebula::cpp2::ErrorCode::SUCCEEDED;
processor->rcProcessRemote = nebula::cpp2::ErrorCode::SUCCEEDED;
processor->rcProcessLocal = nebula::cpp2::ErrorCode::SUCCEEDED;
proc->setPrepareCode(suc);
proc->setRemoteCode(suc);
proc->setCommitCode(suc);

LOG(INFO) << "Build AddEdgesRequest...";
cpp2::AddEdgesRequest req = mock::MockData::mockAddEdgesReq(false, 1);

LOG(INFO) << "Test AddEdgesProcessor...";
auto fut = processor->getFuture();
processor->process(req);
auto fut = proc->getFuture();
proc->process(req);
auto resp = std::move(fut).get();
EXPECT_EQ(0, resp.result.failed_parts.size());

LOG(INFO) << "Check data in kv store...";
// The number of data in serve is 334
EXPECT_EQ(0, resp.result.failed_parts.size());
checkAddEdgesData(req, env, 0, 0);
}

Expand All @@ -68,7 +69,7 @@ TEST(ChainAddEdgesTest, prepareLocalSucceedTest) {
MetaClientTestUpdater::addPartTerm(env->metaClient_, mockSpaceId, mockPartNum, fackTerm);
auto* proc = new FakeChainAddEdgesLocalProcessor(env);

proc->rcProcessRemote = nebula::cpp2::ErrorCode::E_RPC_FAILURE;
proc->setRemoteCode(nebula::cpp2::ErrorCode::E_RPC_FAILURE);

LOG(INFO) << "Build AddEdgesRequest...";
cpp2::AddEdgesRequest req = mock::MockData::mockAddEdgesReq(false, 1);
Expand Down Expand Up @@ -127,7 +128,7 @@ TEST(ChainAddEdgesTest, processRemoteFailedTest) {
MetaClientTestUpdater::addPartTerm(env->metaClient_, mockSpaceId, mockPartNum, fackTerm);

auto* proc = new FakeChainAddEdgesLocalProcessor(env);
proc->rcProcessRemote = nebula::cpp2::ErrorCode::E_OUTDATED_TERM;
proc->setRemoteCode(nebula::cpp2::ErrorCode::E_OUTDATED_TERM);

LOG(INFO) << "Build AddEdgesRequest...";
cpp2::AddEdgesRequest req = mock::MockData::mockAddEdgesReq(false, 1);
Expand All @@ -136,16 +137,14 @@ TEST(ChainAddEdgesTest, processRemoteFailedTest) {
auto fut = proc->getFuture();
proc->process(req);
auto resp = std::move(fut).get();
EXPECT_EQ(1, resp.result.failed_parts.size());
EXPECT_EQ(0, resp.result.failed_parts.size());

ChainTestUtils util;
// none of really edge key should be inserted
EXPECT_EQ(0, numOfKey(req, util.genKey, env));
// prime key should be deleted
EXPECT_EQ(0, numOfKey(req, util.genPrime, env));
EXPECT_EQ(0, numOfKey(req, util.genDoublePrime, env));

// env->txnMan_->stop();
}

TEST(ChainAddEdgesTest, processRemoteUnknownTest) {
Expand All @@ -159,7 +158,7 @@ TEST(ChainAddEdgesTest, processRemoteUnknownTest) {

auto* proc = new FakeChainAddEdgesLocalProcessor(env);

proc->rcProcessRemote = nebula::cpp2::ErrorCode::E_RPC_FAILURE;
proc->setRemoteCode(nebula::cpp2::ErrorCode::E_RPC_FAILURE);

LOG(INFO) << "Build AddEdgesRequest...";
cpp2::AddEdgesRequest req = mock::MockData::mockAddEdgesReq(false, 1);
Expand All @@ -168,39 +167,22 @@ TEST(ChainAddEdgesTest, processRemoteUnknownTest) {
auto fut = proc->getFuture();
proc->process(req);
auto resp = std::move(fut).get();
EXPECT_EQ(0, resp.result.failed_parts.size());

ChainTestUtils util;
// none of really edge key should be inserted
EXPECT_EQ(0, resp.result.failed_parts.size());
EXPECT_EQ(334, numOfKey(req, util.genKey, env));
// prime key should be deleted
EXPECT_EQ(0, numOfKey(req, util.genPrime, env));
EXPECT_EQ(334, numOfKey(req, util.genDoublePrime, env));
}

// make a reversed request, make sure it can be added successfully
TEST(ChainAddEdgesTest, processRemoteTest) {
fs::TempDir rootPath("/tmp/AddEdgesTest.XXXXXX");
mock::MockCluster cluster;
cluster.initStorageKV(rootPath.path());
auto* env = cluster.storageEnv_.get();
auto mClient = MetaClientTestUpdater::makeDefault();

env->metaClient_ = mClient.get();
MetaClientTestUpdater::addPartTerm(env->metaClient_, mockSpaceId, mockPartNum, fackTerm);

auto* proc = new FakeChainAddEdgesLocalProcessor(env);
LOG(INFO) << "Build AddEdgesRequest...";
cpp2::AddEdgesRequest req = mock::MockData::mockAddEdgesReq(false, 1);

auto reversedRequest = proc->reverseRequestForward(req);
delete proc;
}

} // namespace storage
} // namespace nebula

int main(int argc, char** argv) {
FLAGS_trace_toss = true;
FLAGS_v = 1;
testing::InitGoogleTest(&argc, argv);
folly::init(&argc, &argv, false);
google::SetStderrLogging(google::INFO);
Expand Down
Loading

0 comments on commit 782ca6a

Please sign in to comment.