Skip to content

Commit

Permalink
fix job manager ut in asan (#4678)
Browse files Browse the repository at this point in the history
* fix job manager ut in asan

* fix SuperYoko's comments

* address comments

Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com>
  • Loading branch information
panda-sheep and critical27 authored Sep 27, 2022
1 parent a5bed33 commit 69a6d87
Showing 1 changed file with 33 additions and 32 deletions.
65 changes: 33 additions & 32 deletions src/meta/test/JobManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,14 +543,16 @@ TEST_F(JobManagerTest, ShowJobInOtherSpace) {
}

TEST_F(JobManagerTest, RecoverJob) {
std::unique_ptr<JobManager, std::function<void(JobManager*)>> jobMgr = getJobManager();
// set status to prevent running the job since AdminClient is a injector
jobMgr->status_.store(JobManager::JbmgrStatus::STOPPED, std::memory_order_release);
jobMgr->bgThread_.join();
GraphSpaceID spaceId = 1;
int32_t nJob = 3;
int32_t base = 0;

// case 1,recover Queue status job
{
std::unique_ptr<JobManager, std::function<void(JobManager*)>> jobMgr = getJobManager();
// set status to prevent running the job since AdminClient is a injector
jobMgr->status_ = JobManager::JbmgrStatus::STOPPED;
jobMgr->bgThread_.join();
GraphSpaceID spaceId = 1;
int32_t nJob = 3;
for (auto jobId = 1; jobId <= nJob; ++jobId) {
JobDescription jd(spaceId, jobId, cpp2::JobType::FLUSH);
jd.setStatus(cpp2::JobStatus::STOPPED);
Expand Down Expand Up @@ -589,19 +591,18 @@ TEST_F(JobManagerTest, RecoverJob) {
}
auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr);
ASSERT_EQ(nebula::value(nJobRecovered), 3);
std::tuple<JobManager::JbOp, JobID, GraphSpaceID> opJobId;
while (jobMgr->jobSize() != 0) {
jobMgr->tryDequeue(opJobId);
}
}

// case 2
// For the balance job, if there are stopped jobs and failed jobs in turn
// only recover the last balance job
base = 10;
{
std::unique_ptr<JobManager, std::function<void(JobManager*)>> jobMgr = getJobManager();
// set status to prevent running the job since AdminClient is a injector
jobMgr->status_ = JobManager::JbmgrStatus::STOPPED;
jobMgr->bgThread_.join();
GraphSpaceID spaceId = 1;
int32_t nJob = 3;
for (auto jobId = 1; jobId <= nJob; ++jobId) {
for (auto jobId = base + 1; jobId <= base + nJob; ++jobId) {
cpp2::JobStatus jobStatus = cpp2::JobStatus::STOPPED;
JobDescription jd(spaceId, jobId, cpp2::JobType::ZONE_BALANCE, {}, jobStatus);
auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId());
Expand All @@ -613,9 +614,9 @@ TEST_F(JobManagerTest, RecoverJob) {
jd.getErrorCode());
jobMgr->save(jobKey, jobVal);
}
for (auto jobId = nJob + 1; jobId <= nJob + 3; ++jobId) {
for (auto jobId = base + nJob + 1; jobId <= base + nJob + 3; ++jobId) {
cpp2::JobStatus jobStatus = cpp2::JobStatus::FAILED;
JobDescription jd(spaceId, jobId, cpp2::JobType::FLUSH, {}, jobStatus);
JobDescription jd(spaceId, jobId, cpp2::JobType::STATS, {}, jobStatus);
auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId());
auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(),
jd.getParas(),
Expand All @@ -627,7 +628,7 @@ TEST_F(JobManagerTest, RecoverJob) {
}
{
cpp2::JobStatus jobStatus = cpp2::JobStatus::FAILED;
JobDescription jd(spaceId, 7, cpp2::JobType::ZONE_BALANCE, {}, jobStatus);
JobDescription jd(spaceId, base + nJob + 4, cpp2::JobType::ZONE_BALANCE, {}, jobStatus);
auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId());
auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(),
jd.getParas(),
Expand All @@ -637,28 +638,28 @@ TEST_F(JobManagerTest, RecoverJob) {
jd.getErrorCode());
jobMgr->save(jobKey, jobVal);
}
auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {1});
auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {base + 1});
ASSERT_EQ(nebula::value(nJobRecovered), 0);
nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {2});
nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {base + 2});
ASSERT_EQ(nebula::value(nJobRecovered), 0);
nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {3});
nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {base + 3});
ASSERT_EQ(nebula::value(nJobRecovered), 0);

nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {7});
nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {base + nJob + 4});
ASSERT_EQ(nebula::value(nJobRecovered), 1);

std::tuple<JobManager::JbOp, JobID, GraphSpaceID> opJobId;
while (jobMgr->jobSize() != 0) {
jobMgr->tryDequeue(opJobId);
}
}

// case 3
// For the balance jobs, if there is a newer balance job, the failed or stopped jobs can't be
// recovered
base = 20;
{
std::unique_ptr<JobManager, std::function<void(JobManager*)>> jobMgr = getJobManager();
// set status to prevent running the job since AdminClient is a injector
jobMgr->status_ = JobManager::JbmgrStatus::STOPPED;
jobMgr->bgThread_.join();
GraphSpaceID spaceId = 1;
int32_t nJob = 4;
for (auto jobId = 1; jobId <= nJob; ++jobId) {
for (auto jobId = base + 1; jobId <= base + nJob + 1; ++jobId) {
cpp2::JobStatus jobStatus;
if (jobId / 2) {
jobStatus = cpp2::JobStatus::FAILED;
Expand All @@ -675,7 +676,7 @@ TEST_F(JobManagerTest, RecoverJob) {
jd.getErrorCode());
jobMgr->save(jobKey, jobVal);
}
for (auto jobId = nJob + 1; jobId <= nJob + 3; ++jobId) {
for (auto jobId = base + nJob + 2; jobId <= base + nJob + 4; ++jobId) {
cpp2::JobStatus jobStatus = cpp2::JobStatus::FAILED;
JobDescription jd(spaceId, jobId, cpp2::JobType::FLUSH, {}, jobStatus);
auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId());
Expand All @@ -689,7 +690,7 @@ TEST_F(JobManagerTest, RecoverJob) {
}
{
cpp2::JobStatus jobStatus = cpp2::JobStatus::FINISHED;
JobDescription jd(spaceId, 7, cpp2::JobType::ZONE_BALANCE, {}, jobStatus);
JobDescription jd(spaceId, base + nJob + 5, cpp2::JobType::ZONE_BALANCE, {}, jobStatus);
auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId());
auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(),
jd.getParas(),
Expand All @@ -701,7 +702,7 @@ TEST_F(JobManagerTest, RecoverJob) {
}
{
cpp2::JobStatus jobStatus = cpp2::JobStatus::FINISHED;
JobDescription jd(spaceId, 8, cpp2::JobType::COMPACT, {}, jobStatus);
JobDescription jd(spaceId, base + nJob + 6, cpp2::JobType::COMPACT, {}, jobStatus);
auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId());
auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(),
jd.getParas(),
Expand All @@ -711,11 +712,11 @@ TEST_F(JobManagerTest, RecoverJob) {
jd.getErrorCode());
jobMgr->save(jobKey, jobVal);
}
auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {1});
auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {base + 1});
ASSERT_EQ(nebula::value(nJobRecovered), 0);

nJobRecovered = jobMgr->recoverJob(spaceId, nullptr);
ASSERT_EQ(nebula::value(nJobRecovered), 2);
ASSERT_EQ(nebula::value(nJobRecovered), 1);
}
}

Expand Down

0 comments on commit 69a6d87

Please sign in to comment.