diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index 795ade3c73c..1bf953db7f9 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -144,7 +144,10 @@ void JobManager::scheduleThread() { // @return: true if all task dispatched, else false bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) { - std::lock_guard lk(muJobFinished_); + // The reason why use recursive_mutex: if all balance tasks success and the job is stopped before + // job status updated, the balance tasks will get muJobFinished_ through call back in the same + // thread + std::lock_guard lk(muJobFinished_); std::unique_ptr je = JobExecutorFactory::createJobExecutor(jobDesc, kvStore_, adminClient_); JobExecutor* jobExec = je.get(); @@ -174,7 +177,7 @@ bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) { if (jobExec->isMetaJob()) { jobExec->setFinishCallBack([this, jobDesc](meta::cpp2::JobStatus status) { if (status == meta::cpp2::JobStatus::STOPPED) { - std::lock_guard lkg(muJobFinished_); + std::lock_guard lkg(muJobFinished_); cleanJob(jobDesc.getJobId()); return nebula::cpp2::ErrorCode::SUCCEEDED; } else { @@ -206,7 +209,7 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(JobID jobId, cpp2::JobStatus job LOG(INFO) << folly::sformat( "{}, jobId={}, result={}", __func__, jobId, apache::thrift::util::enumNameSafe(jobStatus)); // normal job finish may race to job stop - std::lock_guard lk(muJobFinished_); + std::lock_guard lk(muJobFinished_); auto optJobDescRet = JobDescription::loadJobDescription(jobId, kvStore_); if (!nebula::ok(optJobDescRet)) { LOG(WARNING) << folly::sformat("can't load job, jobId={}", jobId); @@ -414,12 +417,7 @@ size_t JobManager::jobSize() const { } bool JobManager::try_dequeue(std::pair& opJobId) { - if (highPriorityQueue_->try_dequeue(opJobId)) { - return true; - } else if (lowPriorityQueue_->try_dequeue(opJobId)) { - return true; - } - return false; + return (highPriorityQueue_->try_dequeue(opJobId)) || (lowPriorityQueue_->try_dequeue(opJobId)); } void JobManager::enqueue(const JbOp& op, const JobID& jobId, const cpp2::AdminCmd& cmd) { diff --git a/src/meta/processors/job/JobManager.h b/src/meta/processors/job/JobManager.h index 52e73584058..fd07485e00a 100644 --- a/src/meta/processors/job/JobManager.h +++ b/src/meta/processors/job/JobManager.h @@ -158,7 +158,7 @@ class JobManager : public nebula::cpp::NonCopyable, public nebula::cpp::NonMovab AdminClient* adminClient_{nullptr}; std::mutex muReportFinish_; - std::mutex muJobFinished_; + std::recursive_mutex muJobFinished_; std::atomic status_ = JbmgrStatus::NOT_START; }; diff --git a/src/meta/processors/job/LeaderBalanceJobExecutor.cpp b/src/meta/processors/job/LeaderBalanceJobExecutor.cpp index d55144d6bc3..0ead29c773b 100644 --- a/src/meta/processors/job/LeaderBalanceJobExecutor.cpp +++ b/src/meta/processors/job/LeaderBalanceJobExecutor.cpp @@ -87,7 +87,7 @@ ErrorOr LeaderBalanceJobExecutor::getHostParts(Gr int32_t replica = properties.get_replica_factor(); LOG(INFO) << "Replica " << replica; if (dependentOnZone && !properties.get_zone_names().empty()) { - auto zoneNames = properties.get_zone_names(); + const auto& zoneNames = properties.get_zone_names(); int32_t zoneSize = zoneNames.size(); LOG(INFO) << "Zone Size " << zoneSize; auto activeHostsRet = ActiveHostsMan::getActiveHostsWithZones(kvstore_, spaceId); @@ -162,8 +162,8 @@ nebula::cpp2::ErrorCode LeaderBalanceJobExecutor::assembleZoneParts( auto& hosts = zoneIter->second; auto name = zoneIter->first.second; zoneHosts_[name] = hosts; - for (auto hostIter = hosts.begin(); hostIter != hosts.end(); hostIter++) { - auto partIter = hostParts.find(*hostIter); + for (auto& hostIter : hosts) { + auto partIter = hostParts.find(hostIter); LOG(INFO) << "Zone " << name << " have the host " << it->first; if (partIter == hostParts.end()) { zoneParts_[it->first] = ZoneNameAndParts(name, std::vector()); @@ -173,11 +173,10 @@ nebula::cpp2::ErrorCode LeaderBalanceJobExecutor::assembleZoneParts( } } - for (auto it = zoneHosts.begin(); it != zoneHosts.end(); it++) { - auto host = it->first.first; - auto& hosts = it->second; - for (auto hostIter = hosts.begin(); hostIter != hosts.end(); hostIter++) { - auto h = *hostIter; + for (auto& zoneHost : zoneHosts) { + auto host = zoneHost.first.first; + auto& hosts = zoneHost.second; + for (auto h : hosts) { auto iter = std::find_if(hostParts.begin(), hostParts.end(), [h](const auto& pair) -> bool { return h == pair.first; }); @@ -198,11 +197,11 @@ void LeaderBalanceJobExecutor::calDiff(const HostParts& hostParts, const std::vector& activeHosts, std::vector& expand, std::vector& lost) { - for (auto it = hostParts.begin(); it != hostParts.end(); it++) { - VLOG(1) << "Original Host " << it->first << ", parts " << it->second.size(); - if (std::find(activeHosts.begin(), activeHosts.end(), it->first) == activeHosts.end() && - std::find(lost.begin(), lost.end(), it->first) == lost.end()) { - lost.emplace_back(it->first); + for (const auto& hostPart : hostParts) { + VLOG(1) << "Original Host " << hostPart.first << ", parts " << hostPart.second.size(); + if (std::find(activeHosts.begin(), activeHosts.end(), hostPart.first) == activeHosts.end() && + std::find(lost.begin(), lost.end(), hostPart.first) == lost.end()) { + lost.emplace_back(hostPart.first); } } for (auto& h : activeHosts) { @@ -220,7 +219,7 @@ LeaderBalanceJobExecutor::LeaderBalanceJobExecutor(JobID jobId, : MetaJobExecutor(jobId, kvstore, adminClient, params), inLeaderBalance_(false), hostLeaderMap_(nullptr) { - executor_.reset(new folly::CPUThreadPoolExecutor(1)); + executor_ = std::make_unique(1); } nebula::cpp2::ErrorCode LeaderBalanceJobExecutor::finish(bool ret) { @@ -243,7 +242,7 @@ folly::Future LeaderBalanceJobExecutor::executeInternal() { bool expected = false; if (inLeaderBalance_.compare_exchange_strong(expected, true)) { - hostLeaderMap_.reset(new HostLeaderMap); + hostLeaderMap_ = std::make_unique(); auto status = adminClient_->getLeaderDist(hostLeaderMap_.get()).get(); if (!status.ok() || hostLeaderMap_->empty()) { inLeaderBalance_ = false; @@ -287,6 +286,7 @@ folly::Future LeaderBalanceJobExecutor::executeInternal() { inLeaderBalance_ = false; if (failed != 0) { + executorOnFinished_(meta::cpp2::JobStatus::FAILED); return Status::Error("partiton failed to transfer leader"); } executorOnFinished_(meta::cpp2::JobStatus::FINISHED); @@ -354,10 +354,10 @@ ErrorOr LeaderBalanceJobExecutor::buildLeaderBala } if (dependentOnZone) { - for (auto it = allHostParts.begin(); it != allHostParts.end(); it++) { - auto min = it->second.size() / replicaFactor; - VLOG(3) << "Host: " << it->first << " Bounds: " << min << " : " << min + 1; - hostBounds_[it->first] = std::make_pair(min, min + 1); + for (auto& allHostPart : allHostParts) { + auto min = allHostPart.second.size() / replicaFactor; + VLOG(3) << "Host: " << allHostPart.first << " Bounds: " << min << " : " << min + 1; + hostBounds_[allHostPart.first] = std::make_pair(min, min + 1); } } else { size_t activeSize = activeHosts.size(); @@ -377,8 +377,8 @@ ErrorOr LeaderBalanceJobExecutor::buildLeaderBala VLOG(3) << "Build leader balance plan, expected min load: " << globalMin << ", max load: " << globalMax << " avg: " << globalAvg; - for (auto it = allHostParts.begin(); it != allHostParts.end(); it++) { - hostBounds_[it->first] = std::make_pair(globalMin, globalMax); + for (auto& allHostPart : allHostParts) { + hostBounds_[allHostPart.first] = std::make_pair(globalMin, globalMax); } }