Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

job use future #4654

Merged
merged 5 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions src/meta/processors/job/BalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@

namespace nebula {
namespace meta {
BalanceJobExecutor::BalanceJobExecutor(GraphSpaceID space,
JobID jobId,
BalanceJobExecutor::BalanceJobExecutor(JobDescription jobDescription,
kvstore::KVStore* kvstore,
AdminClient* adminClient,
const std::vector<std::string>& paras)
: MetaJobExecutor(space, jobId, kvstore, adminClient, paras) {}
: MetaJobExecutor(jobDescription, kvstore, adminClient, paras) {}

nebula::cpp2::ErrorCode BalanceJobExecutor::check() {
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand Down Expand Up @@ -49,14 +48,7 @@ nebula::cpp2::ErrorCode BalanceJobExecutor::recovery() {
auto optJobRet = JobDescription::makeJobDescription(jobKey, value);
auto optJob = nebula::value(optJobRet);
plan_.reset(new BalancePlan(optJob, kvstore_, adminClient_));
plan_->setFinishCallBack([this](meta::cpp2::JobStatus status) {
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
auto ret = updateLastTime();
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Balance plan " << plan_->id() << " update meta failed";
}
executorOnFinished_(status);
});

auto recRet = plan_->recovery();
if (recRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
plan_.reset(nullptr);
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/job/BalanceJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ struct SpaceInfo {
*/
class BalanceJobExecutor : public MetaJobExecutor {
public:
BalanceJobExecutor(GraphSpaceID space,
JobID jobId,
BalanceJobExecutor(JobDescription jobDescription,
kvstore::KVStore* kvstore,
AdminClient* adminClient,
const std::vector<std::string>& params);
Expand Down
23 changes: 14 additions & 9 deletions src/meta/processors/job/BalancePlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ void BalancePlan::dispatchTasks() {
}
}

void BalancePlan::invoke() {
folly::Future<meta::cpp2::JobStatus> BalancePlan::invoke() {
auto retFuture = promise_.getFuture();

// Sort the tasks by its id to ensure the order after recovery.
setStatus(meta::cpp2::JobStatus::RUNNING);
std::sort(
Expand Down Expand Up @@ -60,9 +62,11 @@ void BalancePlan::invoke() {
if (finished) {
CHECK_EQ(j, buckets_[i].size() - 1);
saveInStore();
onFinished_(stopped ? meta::cpp2::JobStatus::STOPPED
: (failed_ ? meta::cpp2::JobStatus::FAILED
: meta::cpp2::JobStatus::FINISHED));
VLOG(4) << "BalancePlan::invoke finish";
auto result =
stopped ? meta::cpp2::JobStatus::STOPPED
: failed_ ? meta::cpp2::JobStatus::FAILED : meta::cpp2::JobStatus::FINISHED;
promise_.setValue(result);
} else if (j + 1 < buckets_[i].size()) {
auto& task = tasks_[buckets_[i][j + 1]];
if (stopped) {
Expand All @@ -83,6 +87,7 @@ void BalancePlan::invoke() {
}
}
}; // onFinished

tasks_[taskIndex].onError_ = [this, i, j]() {
bool finished = false;
bool stopped = false;
Expand All @@ -100,7 +105,9 @@ void BalancePlan::invoke() {
}
if (finished) {
CHECK_EQ(j, buckets_[i].size() - 1);
onFinished_(stopped ? meta::cpp2::JobStatus::STOPPED : meta::cpp2::JobStatus::FAILED);
VLOG(4) << "BalancePlan::invoke finish";
auto result = stopped ? meta::cpp2::JobStatus::STOPPED : meta::cpp2::JobStatus::FAILED;
promise_.setValue(result);
} else if (j + 1 < buckets_[i].size()) {
auto& task = tasks_[buckets_[i][j + 1]];
LOG(INFO) << "Skip the task for the same partId " << task.partId_;
Expand Down Expand Up @@ -136,6 +143,7 @@ void BalancePlan::invoke() {
tasks_[buckets_[i][0]].invoke();
}
}
return retFuture;
}

nebula::cpp2::ErrorCode BalancePlan::saveInStore() {
Expand Down Expand Up @@ -197,10 +205,6 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<cpp2::BalanceTask>> BalancePlan::sh
return thriftTasks;
}

void BalancePlan::setFinishCallBack(std::function<void(meta::cpp2::JobStatus)> func) {
onFinished_ = func;
}

ErrorOr<nebula::cpp2::ErrorCode, std::vector<BalanceTask>> BalancePlan::getBalanceTasks(
JobID jobId, kvstore::KVStore* kv, AdminClient* client, bool resume) {
CHECK_NOTNULL(kv);
Expand Down Expand Up @@ -236,6 +240,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<BalanceTask>> BalancePlan::getBalan
// Resume the failed or invalid task, skip the in-progress tasks
if (task.ret_ == BalanceTaskResult::FAILED || task.ret_ == BalanceTaskResult::INVALID) {
task.ret_ = BalanceTaskResult::IN_PROGRESS;
task.status_ = BalanceTaskStatus::START;
}
auto activeHostRet = ActiveHostsMan::isLived(kv, task.dst_);
if (!nebula::ok(activeHostRet)) {
Expand Down
12 changes: 3 additions & 9 deletions src/meta/processors/job/BalancePlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class BalancePlan {
tasks_.emplace_back(std::move(task));
}

void invoke();
folly::Future<meta::cpp2::JobStatus> invoke();

/**
* @brief
Expand Down Expand Up @@ -133,13 +133,6 @@ class BalancePlan {
kvstore::KVStore* kv,
AdminClient* client);

/**
* @brief Set a callback function, which would be called when job finished
*
* @param func
*/
void setFinishCallBack(std::function<void(meta::cpp2::JobStatus)> func);

template <typename InputIterator>
void insertTask(InputIterator first, InputIterator last) {
tasks_.insert(tasks_.end(), first, last);
Expand All @@ -152,14 +145,15 @@ class BalancePlan {
std::vector<BalanceTask> tasks_;
std::mutex lock_;
size_t finishedTaskNum_ = 0;
std::function<void(meta::cpp2::JobStatus)> onFinished_;
bool stopped_ = false;
bool failed_ = false;

// List of task index in tasks_;
using Bucket = std::vector<int32_t>;
std::vector<Bucket> buckets_;
std::atomic<int32_t> curIndex_;

folly::Promise<meta::cpp2::JobStatus> promise_;
};

} // namespace meta
Expand Down
10 changes: 5 additions & 5 deletions src/meta/processors/job/DataBalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@ folly::Future<nebula::cpp2::ErrorCode> DataBalanceJobExecutor::executeInternal()
Status status = buildBalancePlan();
if (status != Status::OK()) {
if (status == Status::Balanced()) {
executorOnFinished_(meta::cpp2::JobStatus::FINISHED);
jobDescription_.setStatus(meta::cpp2::JobStatus::FINISHED, true);
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
return nebula::cpp2::ErrorCode::E_BALANCER_FAILURE;
}
}
plan_->setFinishCallBack([this](meta::cpp2::JobStatus status) {

return plan_->invoke().thenValue([this](meta::cpp2::JobStatus status) mutable {
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
auto ret = updateLastTime();
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Balance plan " << plan_->id() << " update meta failed";
}
executorOnFinished_(status);
jobDescription_.setStatus(status, true);
return nebula::cpp2::ErrorCode::SUCCEEDED;
});
plan_->invoke();
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

Status DataBalanceJobExecutor::buildBalancePlan() {
Expand Down
4 changes: 1 addition & 3 deletions src/meta/processors/job/DataBalanceJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ class DataBalanceJobExecutor : public BalanceJobExecutor {
kvstore::KVStore* kvstore,
AdminClient* adminClient,
const std::vector<std::string>& params)
: BalanceJobExecutor(
jobDescription.getSpace(), jobDescription.getJobId(), kvstore, adminClient, params),
jobDescription_(jobDescription) {}
: BalanceJobExecutor(jobDescription, kvstore, adminClient, params) {}

/**
* @brief Parse paras to lost hosts
Expand Down
5 changes: 4 additions & 1 deletion src/meta/processors/job/JobDescription.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,10 @@ class JobDescription {
cpp2::JobDesc toJobDesc();

bool operator==(const JobDescription& that) const {
return space_ == that.space_ && type_ == that.type_ && paras_ == that.paras_;
bool res = (space_ == that.space_) && (type_ == that.type_);
return (type_ == cpp2::JobType::ZONE_BALANCE || type_ == cpp2::JobType::DATA_BALANCE)
? res
: res && (paras_ == that.paras_);
}

bool operator!=(const JobDescription& that) const {
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/job/JobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ std::unique_ptr<JobExecutor> JobExecutorFactory::createJobExecutor(const JobDesc
ret.reset(new ZoneBalanceJobExecutor(jd, store, client, jd.getParas()));
break;
case cpp2::JobType::LEADER_BALANCE:
ret.reset(
new LeaderBalanceJobExecutor(jd.getSpace(), jd.getJobId(), store, client, jd.getParas()));
ret.reset(new LeaderBalanceJobExecutor(jd, store, client, jd.getParas()));
break;
case cpp2::JobType::REBUILD_TAG_INDEX:
ret.reset(
Expand Down
21 changes: 12 additions & 9 deletions src/meta/processors/job/JobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class JobExecutor {
*
* @return
*/
virtual nebula::cpp2::ErrorCode execute() = 0;
virtual folly::Future<nebula::cpp2::ErrorCode> execute() = 0;

/**
* @brief Stop the job when the user cancel it.
Expand All @@ -68,22 +68,23 @@ class JobExecutor {

virtual bool isMetaJob() = 0;

virtual JobDescription getJobDescription() = 0;

/**
* @brief Set a callback which will be called when job finished, storage executor don't need it,
* @brief Provide an extra status for some special tasks
*
* @param func
* @return
*/
virtual void setFinishCallBack(
std::function<nebula::cpp2::ErrorCode(meta::cpp2::JobStatus)> func) {
UNUSED(func);
}
virtual nebula::cpp2::ErrorCode saveSpecialTaskStatus(const cpp2::ReportTaskReq&) = 0;

/**
* @brief Provide an extra status for some special tasks
* @brief Determine whether the current job executor is running
*
* @return
*/
virtual nebula::cpp2::ErrorCode saveSpecialTaskStatus(const cpp2::ReportTaskReq&) = 0;
virtual bool isRunning() = 0;

virtual void resetRunningStatus() = 0;

protected:
nebula::cpp2::ErrorCode spaceExist();
Expand All @@ -92,6 +93,8 @@ class JobExecutor {
kvstore::KVStore* kvstore_{nullptr};

GraphSpaceID space_;

std::atomic<bool> isRunning_{false};
};

class JobExecutorFactory {
Expand Down
Loading