From 27afd7b4bfb5a345a6b41ebf4deec18ccc30ab1a Mon Sep 17 00:00:00 2001 From: liwenhui-soul <38217397+liwenhui-soul@users.noreply.github.com> Date: Mon, 21 Mar 2022 19:37:46 +0800 Subject: [PATCH] fix bug stop job before it start --- src/meta/processors/job/JobDescription.cpp | 2 +- src/meta/processors/job/JobManager.cpp | 9 +++++---- src/meta/processors/job/JobManager.h | 1 + 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/meta/processors/job/JobDescription.cpp b/src/meta/processors/job/JobDescription.cpp index 4e9d2cedd67..913a02e5c82 100644 --- a/src/meta/processors/job/JobDescription.cpp +++ b/src/meta/processors/job/JobDescription.cpp @@ -77,7 +77,7 @@ bool JobDescription::setStatus(Status newStatus, bool force) { return false; } status_ = newStatus; - if (newStatus == Status::RUNNING) { + if (newStatus == Status::RUNNING || (newStatus == Status::STOPPED && startTime_ == 0)) { startTime_ = std::time(nullptr); } if (JobStatus::laterThan(newStatus, Status::RUNNING)) { diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index 3515e6db5ad..5c4519590de 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -126,6 +126,7 @@ void JobManager::scheduleThread() { auto jobOp = std::get<0>(opJobId); auto jodId = std::get<1>(opJobId); auto spaceId = std::get<2>(opJobId); + std::lock_guard lk(muJobFinished_[spaceId]); auto jobDescRet = JobDescription::loadJobDescription(spaceId, jodId, kvStore_); if (!nebula::ok(jobDescRet)) { LOG(INFO) << "[JobManager] load an invalid job from space " << spaceId << " jodId " << jodId; @@ -152,8 +153,6 @@ void JobManager::scheduleThread() { } bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) { - auto spaceId = jobDesc.getSpace(); - std::lock_guard lk(muJobFinished_[spaceId]); std::unique_ptr je = JobExecutorFactory::createJobExecutor(jobDesc, kvStore_, adminClient_); JobExecutor* jobExec = je.get(); @@ -257,8 +256,10 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(GraphSpaceID spaceId, auto it = runningJobs_.find(jobId); if (it == runningJobs_.end()) { - LOG(INFO) << folly::sformat("Can't find jobExecutor, jobId={}", jobId); - return nebula::cpp2::ErrorCode::E_UNKNOWN; + // the job has not started yet + // TODO job not existing in runningJobs_ also means leader changed, we handle it later + cleanJob(jobId); + return nebula::cpp2::ErrorCode::SUCCEEDED; } std::unique_ptr& jobExec = it->second; if (jobStatus == cpp2::JobStatus::STOPPED) { diff --git a/src/meta/processors/job/JobManager.h b/src/meta/processors/job/JobManager.h index f33d18cf4b5..50cf1243e73 100644 --- a/src/meta/processors/job/JobManager.h +++ b/src/meta/processors/job/JobManager.h @@ -281,6 +281,7 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable { AdminClient* adminClient_{nullptr}; std::map muReportFinish_; + // Start & stop & finish a job need mutual exclusion // The reason of using recursive_mutex is that, it's possible for a meta job try to get this lock // in finish-callback in the same thread with runJobInternal std::map muJobFinished_;