Skip to content

Commit

Permalink
fix #3636
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwener committed Jan 20, 2022
1 parent 46b2aac commit bc621d8
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 31 deletions.
16 changes: 7 additions & 9 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ void JobManager::scheduleThread() {

// @return: true if all task dispatched, else false
bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) {
std::lock_guard<std::mutex> lk(muJobFinished_);
// The reason why use recursive_mutex: if all balance tasks success and the job is stopped before
// job status updated, the balance tasks will get muJobFinished_ through call back in the same
// thread
std::lock_guard<std::recursive_mutex> lk(muJobFinished_);
std::unique_ptr<JobExecutor> je =
JobExecutorFactory::createJobExecutor(jobDesc, kvStore_, adminClient_);
JobExecutor* jobExec = je.get();
Expand Down Expand Up @@ -174,7 +177,7 @@ bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) {
if (jobExec->isMetaJob()) {
jobExec->setFinishCallBack([this, jobDesc](meta::cpp2::JobStatus status) {
if (status == meta::cpp2::JobStatus::STOPPED) {
std::lock_guard<std::mutex> lkg(muJobFinished_);
std::lock_guard<std::recursive_mutex> lkg(muJobFinished_);
cleanJob(jobDesc.getJobId());
return nebula::cpp2::ErrorCode::SUCCEEDED;
} else {
Expand Down Expand Up @@ -206,7 +209,7 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(JobID jobId, cpp2::JobStatus job
LOG(INFO) << folly::sformat(
"{}, jobId={}, result={}", __func__, jobId, apache::thrift::util::enumNameSafe(jobStatus));
// normal job finish may race to job stop
std::lock_guard<std::mutex> lk(muJobFinished_);
std::lock_guard<std::recursive_mutex> lk(muJobFinished_);
auto optJobDescRet = JobDescription::loadJobDescription(jobId, kvStore_);
if (!nebula::ok(optJobDescRet)) {
LOG(WARNING) << folly::sformat("can't load job, jobId={}", jobId);
Expand Down Expand Up @@ -414,12 +417,7 @@ size_t JobManager::jobSize() const {
}

bool JobManager::try_dequeue(std::pair<JbOp, JobID>& opJobId) {
if (highPriorityQueue_->try_dequeue(opJobId)) {
return true;
} else if (lowPriorityQueue_->try_dequeue(opJobId)) {
return true;
}
return false;
return (highPriorityQueue_->try_dequeue(opJobId)) || (lowPriorityQueue_->try_dequeue(opJobId));
}

void JobManager::enqueue(const JbOp& op, const JobID& jobId, const cpp2::AdminCmd& cmd) {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class JobManager : public nebula::cpp::NonCopyable, public nebula::cpp::NonMovab
AdminClient* adminClient_{nullptr};

std::mutex muReportFinish_;
std::mutex muJobFinished_;
std::recursive_mutex muJobFinished_;
std::atomic<JbmgrStatus> status_ = JbmgrStatus::NOT_START;
};

Expand Down
42 changes: 21 additions & 21 deletions src/meta/processors/job/LeaderBalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ ErrorOr<nebula::cpp2::ErrorCode, bool> LeaderBalanceJobExecutor::getHostParts(Gr
int32_t replica = properties.get_replica_factor();
LOG(INFO) << "Replica " << replica;
if (dependentOnZone && !properties.get_zone_names().empty()) {
auto zoneNames = properties.get_zone_names();
const auto& zoneNames = properties.get_zone_names();
int32_t zoneSize = zoneNames.size();
LOG(INFO) << "Zone Size " << zoneSize;
auto activeHostsRet = ActiveHostsMan::getActiveHostsWithZones(kvstore_, spaceId);
Expand Down Expand Up @@ -162,8 +162,8 @@ nebula::cpp2::ErrorCode LeaderBalanceJobExecutor::assembleZoneParts(
auto& hosts = zoneIter->second;
auto name = zoneIter->first.second;
zoneHosts_[name] = hosts;
for (auto hostIter = hosts.begin(); hostIter != hosts.end(); hostIter++) {
auto partIter = hostParts.find(*hostIter);
for (auto& hostIter : hosts) {
auto partIter = hostParts.find(hostIter);
LOG(INFO) << "Zone " << name << " have the host " << it->first;
if (partIter == hostParts.end()) {
zoneParts_[it->first] = ZoneNameAndParts(name, std::vector<PartitionID>());
Expand All @@ -173,11 +173,10 @@ nebula::cpp2::ErrorCode LeaderBalanceJobExecutor::assembleZoneParts(
}
}

for (auto it = zoneHosts.begin(); it != zoneHosts.end(); it++) {
auto host = it->first.first;
auto& hosts = it->second;
for (auto hostIter = hosts.begin(); hostIter != hosts.end(); hostIter++) {
auto h = *hostIter;
for (auto& zoneHost : zoneHosts) {
auto host = zoneHost.first.first;
auto& hosts = zoneHost.second;
for (auto h : hosts) {
auto iter = std::find_if(hostParts.begin(), hostParts.end(), [h](const auto& pair) -> bool {
return h == pair.first;
});
Expand All @@ -198,11 +197,11 @@ void LeaderBalanceJobExecutor::calDiff(const HostParts& hostParts,
const std::vector<HostAddr>& activeHosts,
std::vector<HostAddr>& expand,
std::vector<HostAddr>& lost) {
for (auto it = hostParts.begin(); it != hostParts.end(); it++) {
VLOG(1) << "Original Host " << it->first << ", parts " << it->second.size();
if (std::find(activeHosts.begin(), activeHosts.end(), it->first) == activeHosts.end() &&
std::find(lost.begin(), lost.end(), it->first) == lost.end()) {
lost.emplace_back(it->first);
for (const auto& hostPart : hostParts) {
VLOG(1) << "Original Host " << hostPart.first << ", parts " << hostPart.second.size();
if (std::find(activeHosts.begin(), activeHosts.end(), hostPart.first) == activeHosts.end() &&
std::find(lost.begin(), lost.end(), hostPart.first) == lost.end()) {
lost.emplace_back(hostPart.first);
}
}
for (auto& h : activeHosts) {
Expand All @@ -220,7 +219,7 @@ LeaderBalanceJobExecutor::LeaderBalanceJobExecutor(JobID jobId,
: MetaJobExecutor(jobId, kvstore, adminClient, params),
inLeaderBalance_(false),
hostLeaderMap_(nullptr) {
executor_.reset(new folly::CPUThreadPoolExecutor(1));
executor_ = std::make_unique<folly::CPUThreadPoolExecutor>(1);
}

nebula::cpp2::ErrorCode LeaderBalanceJobExecutor::finish(bool ret) {
Expand All @@ -243,7 +242,7 @@ folly::Future<Status> LeaderBalanceJobExecutor::executeInternal() {

bool expected = false;
if (inLeaderBalance_.compare_exchange_strong(expected, true)) {
hostLeaderMap_.reset(new HostLeaderMap);
hostLeaderMap_ = std::make_unique<HostLeaderMap>();
auto status = adminClient_->getLeaderDist(hostLeaderMap_.get()).get();
if (!status.ok() || hostLeaderMap_->empty()) {
inLeaderBalance_ = false;
Expand Down Expand Up @@ -287,6 +286,7 @@ folly::Future<Status> LeaderBalanceJobExecutor::executeInternal() {

inLeaderBalance_ = false;
if (failed != 0) {
executorOnFinished_(meta::cpp2::JobStatus::FAILED);
return Status::Error("partiton failed to transfer leader");
}
executorOnFinished_(meta::cpp2::JobStatus::FINISHED);
Expand Down Expand Up @@ -354,10 +354,10 @@ ErrorOr<nebula::cpp2::ErrorCode, bool> LeaderBalanceJobExecutor::buildLeaderBala
}

if (dependentOnZone) {
for (auto it = allHostParts.begin(); it != allHostParts.end(); it++) {
auto min = it->second.size() / replicaFactor;
VLOG(3) << "Host: " << it->first << " Bounds: " << min << " : " << min + 1;
hostBounds_[it->first] = std::make_pair(min, min + 1);
for (auto& allHostPart : allHostParts) {
auto min = allHostPart.second.size() / replicaFactor;
VLOG(3) << "Host: " << allHostPart.first << " Bounds: " << min << " : " << min + 1;
hostBounds_[allHostPart.first] = std::make_pair(min, min + 1);
}
} else {
size_t activeSize = activeHosts.size();
Expand All @@ -377,8 +377,8 @@ ErrorOr<nebula::cpp2::ErrorCode, bool> LeaderBalanceJobExecutor::buildLeaderBala
VLOG(3) << "Build leader balance plan, expected min load: " << globalMin
<< ", max load: " << globalMax << " avg: " << globalAvg;

for (auto it = allHostParts.begin(); it != allHostParts.end(); it++) {
hostBounds_[it->first] = std::make_pair(globalMin, globalMax);
for (auto& allHostPart : allHostParts) {
hostBounds_[allHostPart.first] = std::make_pair(globalMin, globalMax);
}
}

Expand Down

0 comments on commit bc621d8

Please sign in to comment.