Skip to content

Commit

Permalink
fix bug stop job before it start
Browse files Browse the repository at this point in the history
  • Loading branch information
liwenhui-soul committed Mar 24, 2022
1 parent dd270fb commit 2d15039
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ void JobManager::scheduleThread() {
auto jobOp = std::get<0>(opJobId);
auto jodId = std::get<1>(opJobId);
auto spaceId = std::get<2>(opJobId);
std::lock_guard<std::recursive_mutex> lk(muJobFinished_[spaceId]);
auto jobDescRet = JobDescription::loadJobDescription(spaceId, jodId, kvStore_);
if (!nebula::ok(jobDescRet)) {
LOG(INFO) << "[JobManager] load an invalid job from space " << spaceId << " jodId " << jodId;
Expand All @@ -152,8 +153,6 @@ void JobManager::scheduleThread() {
}

bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) {
auto spaceId = jobDesc.getSpace();
std::lock_guard<std::recursive_mutex> lk(muJobFinished_[spaceId]);
std::unique_ptr<JobExecutor> je =
JobExecutorFactory::createJobExecutor(jobDesc, kvStore_, adminClient_);
JobExecutor* jobExec = je.get();
Expand Down Expand Up @@ -257,8 +256,10 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(GraphSpaceID spaceId,

auto it = runningJobs_.find(jobId);
if (it == runningJobs_.end()) {
LOG(INFO) << folly::sformat("Can't find jobExecutor, jobId={}", jobId);
return nebula::cpp2::ErrorCode::E_UNKNOWN;
// the job has not started yet
// TODO job not existing in runningJobs_ also means leader changed, we handle it later
cleanJob(jobId);
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
std::unique_ptr<JobExecutor>& jobExec = it->second;
if (jobStatus == cpp2::JobStatus::STOPPED) {
Expand Down Expand Up @@ -513,10 +514,10 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<cpp2::JobDesc>> JobManager::showJob

bool JobManager::isExpiredJob(JobDescription& jobDesc) {
auto status = jobDesc.getStatus();
if (status == cpp2::JobStatus::QUEUE || status == cpp2::JobStatus::RUNNING) {
auto jobStart = jobDesc.getStartTime();
if (status == cpp2::JobStatus::QUEUE || status == cpp2::JobStatus::RUNNING || jobStart == 0) {
return false;
}
auto jobStart = jobDesc.getStartTime();
auto duration = std::difftime(nebula::time::WallClock::fastNowInSec(), jobStart);
return duration > FLAGS_job_expired_secs;
}
Expand Down

0 comments on commit 2d15039

Please sign in to comment.