Skip to content

Commit

Permalink
Merge branch 'ttl-check-time' of github.com:pengweisong/nebula into t…
Browse files Browse the repository at this point in the history
…tl-check-time
  • Loading branch information
pengweisong committed Oct 9, 2022
2 parents e37bf51 + 8fa710d commit ad0a5a4
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 35 deletions.
2 changes: 1 addition & 1 deletion src/graph/executor/admin/ShowHostsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ folly::Future<Status> ShowHostsExecutor::showHosts() {
for (const auto &host : hostVec) {
nebula::Row r({host.get_hostAddr().host,
host.get_hostAddr().port,
FLAGS_ws_http_port,
19779, // FIXME: update real http port.
apache::thrift::util::enumNameSafe(host.get_status())});
int64_t leaderCount = 0;
for (const auto &spaceEntry : host.get_leader_parts()) {
Expand Down
65 changes: 33 additions & 32 deletions src/meta/test/JobManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,14 +543,16 @@ TEST_F(JobManagerTest, ShowJobInOtherSpace) {
}

TEST_F(JobManagerTest, RecoverJob) {
std::unique_ptr<JobManager, std::function<void(JobManager*)>> jobMgr = getJobManager();
// set status to prevent running the job since AdminClient is a injector
jobMgr->status_.store(JobManager::JbmgrStatus::STOPPED, std::memory_order_release);
jobMgr->bgThread_.join();
GraphSpaceID spaceId = 1;
int32_t nJob = 3;
int32_t base = 0;

// case 1,recover Queue status job
{
std::unique_ptr<JobManager, std::function<void(JobManager*)>> jobMgr = getJobManager();
// set status to prevent running the job since AdminClient is a injector
jobMgr->status_ = JobManager::JbmgrStatus::STOPPED;
jobMgr->bgThread_.join();
GraphSpaceID spaceId = 1;
int32_t nJob = 3;
for (auto jobId = 1; jobId <= nJob; ++jobId) {
JobDescription jd(spaceId, jobId, cpp2::JobType::FLUSH);
jd.setStatus(cpp2::JobStatus::STOPPED);
Expand Down Expand Up @@ -589,19 +591,18 @@ TEST_F(JobManagerTest, RecoverJob) {
}
auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr);
ASSERT_EQ(nebula::value(nJobRecovered), 3);
std::tuple<JobManager::JbOp, JobID, GraphSpaceID> opJobId;
while (jobMgr->jobSize() != 0) {
jobMgr->tryDequeue(opJobId);
}
}

// case 2
// For the balance job, if there are stopped jobs and failed jobs in turn
// only recover the last balance job
base = 10;
{
std::unique_ptr<JobManager, std::function<void(JobManager*)>> jobMgr = getJobManager();
// set status to prevent running the job since AdminClient is a injector
jobMgr->status_ = JobManager::JbmgrStatus::STOPPED;
jobMgr->bgThread_.join();
GraphSpaceID spaceId = 1;
int32_t nJob = 3;
for (auto jobId = 1; jobId <= nJob; ++jobId) {
for (auto jobId = base + 1; jobId <= base + nJob; ++jobId) {
cpp2::JobStatus jobStatus = cpp2::JobStatus::STOPPED;
JobDescription jd(spaceId, jobId, cpp2::JobType::ZONE_BALANCE, {}, jobStatus);
auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId());
Expand All @@ -613,9 +614,9 @@ TEST_F(JobManagerTest, RecoverJob) {
jd.getErrorCode());
jobMgr->save(jobKey, jobVal);
}
for (auto jobId = nJob + 1; jobId <= nJob + 3; ++jobId) {
for (auto jobId = base + nJob + 1; jobId <= base + nJob + 3; ++jobId) {
cpp2::JobStatus jobStatus = cpp2::JobStatus::FAILED;
JobDescription jd(spaceId, jobId, cpp2::JobType::FLUSH, {}, jobStatus);
JobDescription jd(spaceId, jobId, cpp2::JobType::STATS, {}, jobStatus);
auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId());
auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(),
jd.getParas(),
Expand All @@ -627,7 +628,7 @@ TEST_F(JobManagerTest, RecoverJob) {
}
{
cpp2::JobStatus jobStatus = cpp2::JobStatus::FAILED;
JobDescription jd(spaceId, 7, cpp2::JobType::ZONE_BALANCE, {}, jobStatus);
JobDescription jd(spaceId, base + nJob + 4, cpp2::JobType::ZONE_BALANCE, {}, jobStatus);
auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId());
auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(),
jd.getParas(),
Expand All @@ -637,28 +638,28 @@ TEST_F(JobManagerTest, RecoverJob) {
jd.getErrorCode());
jobMgr->save(jobKey, jobVal);
}
auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {1});
auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {base + 1});
ASSERT_EQ(nebula::value(nJobRecovered), 0);
nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {2});
nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {base + 2});
ASSERT_EQ(nebula::value(nJobRecovered), 0);
nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {3});
nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {base + 3});
ASSERT_EQ(nebula::value(nJobRecovered), 0);

nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {7});
nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {base + nJob + 4});
ASSERT_EQ(nebula::value(nJobRecovered), 1);

std::tuple<JobManager::JbOp, JobID, GraphSpaceID> opJobId;
while (jobMgr->jobSize() != 0) {
jobMgr->tryDequeue(opJobId);
}
}

// case 3
// For the balance jobs, if there is a newer balance job, the failed or stopped jobs can't be
// recovered
base = 20;
{
std::unique_ptr<JobManager, std::function<void(JobManager*)>> jobMgr = getJobManager();
// set status to prevent running the job since AdminClient is a injector
jobMgr->status_ = JobManager::JbmgrStatus::STOPPED;
jobMgr->bgThread_.join();
GraphSpaceID spaceId = 1;
int32_t nJob = 4;
for (auto jobId = 1; jobId <= nJob; ++jobId) {
for (auto jobId = base + 1; jobId <= base + nJob + 1; ++jobId) {
cpp2::JobStatus jobStatus;
if (jobId / 2) {
jobStatus = cpp2::JobStatus::FAILED;
Expand All @@ -675,7 +676,7 @@ TEST_F(JobManagerTest, RecoverJob) {
jd.getErrorCode());
jobMgr->save(jobKey, jobVal);
}
for (auto jobId = nJob + 1; jobId <= nJob + 3; ++jobId) {
for (auto jobId = base + nJob + 2; jobId <= base + nJob + 4; ++jobId) {
cpp2::JobStatus jobStatus = cpp2::JobStatus::FAILED;
JobDescription jd(spaceId, jobId, cpp2::JobType::FLUSH, {}, jobStatus);
auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId());
Expand All @@ -689,7 +690,7 @@ TEST_F(JobManagerTest, RecoverJob) {
}
{
cpp2::JobStatus jobStatus = cpp2::JobStatus::FINISHED;
JobDescription jd(spaceId, 7, cpp2::JobType::ZONE_BALANCE, {}, jobStatus);
JobDescription jd(spaceId, base + nJob + 5, cpp2::JobType::ZONE_BALANCE, {}, jobStatus);
auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId());
auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(),
jd.getParas(),
Expand All @@ -701,7 +702,7 @@ TEST_F(JobManagerTest, RecoverJob) {
}
{
cpp2::JobStatus jobStatus = cpp2::JobStatus::FINISHED;
JobDescription jd(spaceId, 8, cpp2::JobType::COMPACT, {}, jobStatus);
JobDescription jd(spaceId, base + nJob + 6, cpp2::JobType::COMPACT, {}, jobStatus);
auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId());
auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(),
jd.getParas(),
Expand All @@ -711,11 +712,11 @@ TEST_F(JobManagerTest, RecoverJob) {
jd.getErrorCode());
jobMgr->save(jobKey, jobVal);
}
auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {1});
auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {base + 1});
ASSERT_EQ(nebula::value(nJobRecovered), 0);

nJobRecovered = jobMgr->recoverJob(spaceId, nullptr);
ASSERT_EQ(nebula::value(nJobRecovered), 2);
ASSERT_EQ(nebula::value(nJobRecovered), 1);
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/storage/mutate/AddVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ void AddVerticesProcessor::process(const cpp2::AddVerticesRequest& req) {
void AddVerticesProcessor::doProcess(const cpp2::AddVerticesRequest& req) {
const auto& partVertices = req.get_parts();
const auto& propNamesMap = req.get_prop_names();
bool onlyVertex = propNamesMap.empty();
for (auto& part : partVertices) {
auto partId = part.first;
const auto& vertices = part.second;
Expand All @@ -81,7 +82,7 @@ void AddVerticesProcessor::doProcess(const cpp2::AddVerticesRequest& req) {
code = nebula::cpp2::ErrorCode::E_INVALID_VID;
break;
}
if (FLAGS_use_vertex_key) {
if (onlyVertex && FLAGS_use_vertex_key) {
data.emplace_back(NebulaKeyUtils::vertexKey(spaceVidLen_, partId, vid), "");
}
for (auto& newTag : newTags) {
Expand Down Expand Up @@ -139,6 +140,7 @@ void AddVerticesProcessor::doProcess(const cpp2::AddVerticesRequest& req) {
void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& req) {
const auto& partVertices = req.get_parts();
const auto& propNamesMap = req.get_prop_names();
bool onlyVertex = propNamesMap.empty();

for (const auto& part : partVertices) {
auto partId = part.first;
Expand All @@ -161,7 +163,7 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re
code = nebula::cpp2::ErrorCode::E_INVALID_VID;
break;
}
if (FLAGS_use_vertex_key) {
if (onlyVertex && FLAGS_use_vertex_key) {
verticeData.emplace_back(NebulaKeyUtils::vertexKey(spaceVidLen_, partId, vid));
}
for (const auto& newTag : newTags) {
Expand Down

0 comments on commit ad0a5a4

Please sign in to comment.