Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor job manager part1 #3976

Merged
merged 2 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/graph/validator/AdminJobValidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class AdminJobValidator final : public Validator {
switch (sentence_->getOp()) {
case meta::cpp2::AdminJobOp::ADD:
switch (sentence_->getCmd()) {
// All jobs are space-level, except for the jobs that need to be refactored.
case meta::cpp2::AdminCmd::REBUILD_TAG_INDEX:
case meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX:
case meta::cpp2::AdminCmd::REBUILD_FULLTEXT_INDEX:
Expand All @@ -40,7 +41,8 @@ class AdminJobValidator final : public Validator {
case meta::cpp2::AdminCmd::LEADER_BALANCE:
case meta::cpp2::AdminCmd::ZONE_BALANCE:
return true;
// TODO: Also space related, but not available in CreateJobExecutor now.
// TODO: download and ingest need to be refactored to use the rpc protocol.
// Currently they are using their own validator
case meta::cpp2::AdminCmd::DOWNLOAD:
case meta::cpp2::AdminCmd::INGEST:
case meta::cpp2::AdminCmd::UNKNOWN:
Expand Down
3 changes: 1 addition & 2 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -816,12 +816,11 @@ struct ListClusterInfoReq {
}

struct AddTaskRequest {
// rebuild index / flush / compact / statis
// Task distributed to storage to execute, e.g. flush, compact, stats, etc.
1: meta.AdminCmd cmd
2: i32 job_id
3: i32 task_id
4: TaskPara para
5: optional i32 concurrency
}

struct AddTaskResp {
Expand Down
4 changes: 1 addition & 3 deletions src/meta/processors/admin/AdminClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -717,8 +717,7 @@ folly::Future<StatusOr<bool>> AdminClient::addTask(
GraphSpaceID spaceId,
const HostAddr& host,
const std::vector<std::string>& taskSpecificParas,
std::vector<PartitionID> parts,
int concurrency) {
std::vector<PartitionID> parts) {
folly::Promise<StatusOr<bool>> pro;
auto f = pro.getFuture();
auto adminAddr = Utils::getAdminAddrFromStoreAddr(host);
Expand All @@ -727,7 +726,6 @@ folly::Future<StatusOr<bool>> AdminClient::addTask(
req.cmd_ref() = cmd;
req.job_id_ref() = jobId;
req.task_id_ref() = taskId;
req.concurrency_ref() = concurrency;

storage::cpp2::TaskPara para;
para.space_id_ref() = spaceId;
Expand Down
4 changes: 1 addition & 3 deletions src/meta/processors/admin/AdminClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ class AdminClient {
* @param host Target host to add task
* @param taskSpecficParas
* @param parts
* @param concurrency
* @return folly::Future<StatusOr<bool>> Return true if succeed, else return an error status
*/
virtual folly::Future<StatusOr<bool>> addTask(cpp2::AdminCmd cmd,
Expand All @@ -213,8 +212,7 @@ class AdminClient {
GraphSpaceID spaceId,
const HostAddr& host,
const std::vector<std::string>& taskSpecficParas,
std::vector<PartitionID> parts,
int concurrency);
std::vector<PartitionID> parts);

/**
* @brief Stop stoarge admin task in given storage host
Expand Down
93 changes: 53 additions & 40 deletions src/meta/processors/job/AdminJobProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@
#include "common/base/StatusOr.h"
#include "common/stats/StatsManager.h"
#include "meta/processors/job/JobDescription.h"
#include "meta/processors/job/JobManager.h"

namespace nebula {
namespace meta {

void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
cpp2::AdminJobResult result;
auto errorCode = nebula::cpp2::ErrorCode::SUCCEEDED;
std::stringstream oss;
oss << "op = " << apache::thrift::util::enumNameSafe(req.get_op());
if (req.get_op() == nebula::meta::cpp2::AdminJobOp::ADD) {
Expand All @@ -27,45 +24,17 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
}
LOG(INFO) << __func__ << "() " << oss.str();

JobManager* jobMgr = JobManager::getInstance();
cpp2::AdminJobResult result;
auto errorCode = nebula::cpp2::ErrorCode::SUCCEEDED;
panda-sheep marked this conversation as resolved.
Show resolved Hide resolved
jobMgr_ = JobManager::getInstance();

switch (req.get_op()) {
case nebula::meta::cpp2::AdminJobOp::ADD: {
auto cmd = req.get_cmd();
auto paras = req.get_paras();
if (cmd == cpp2::AdminCmd::REBUILD_TAG_INDEX || cmd == cpp2::AdminCmd::REBUILD_EDGE_INDEX ||
cmd == cpp2::AdminCmd::STATS) {
if (paras.empty()) {
LOG(INFO) << "Parameter should be not empty";
errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}
}

JobID jId = 0;
auto jobExist = jobMgr->checkJobExist(cmd, paras, jId);
if (jobExist) {
LOG(INFO) << "Job has already exists: " << jId;
result.job_id_ref() = jId;
break;
}

folly::SharedMutex::WriteHolder holder(LockUtils::lock());
auto jobId = autoIncrementId();
// check if Job not exists
if (!nebula::ok(jobId)) {
errorCode = nebula::error(jobId);
break;
}

JobDescription jobDesc(nebula::value(jobId), cmd, paras);
errorCode = jobMgr->addJob(jobDesc, adminClient_);
if (errorCode == nebula::cpp2::ErrorCode::SUCCEEDED) {
result.job_id_ref() = nebula::value(jobId);
}
errorCode = addJobProcess(req, result);
panda-sheep marked this conversation as resolved.
Show resolved Hide resolved
break;
}
case nebula::meta::cpp2::AdminJobOp::SHOW_All: {
auto ret = jobMgr->showJobs(req.get_paras().back());
auto ret = jobMgr_->showJobs(req.get_paras().back());
if (nebula::ok(ret)) {
result.job_desc_ref() = nebula::value(ret);
} else {
Expand All @@ -87,7 +56,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}
auto ret = jobMgr->showJob(iJob, req.get_paras().back());
auto ret = jobMgr_->showJob(iJob, req.get_paras().back());
if (nebula::ok(ret)) {
result.job_desc_ref() = std::vector<cpp2::JobDesc>{nebula::value(ret).first};
result.task_desc_ref() = nebula::value(ret).second;
Expand All @@ -109,7 +78,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}
errorCode = jobMgr->stopJob(iJob, req.get_paras().back());
errorCode = jobMgr_->stopJob(iJob, req.get_paras().back());
break;
}
case nebula::meta::cpp2::AdminJobOp::RECOVER: {
Expand All @@ -120,7 +89,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
for (size_t i = 0; i < paras.size() - 1; i++) {
jobIds.push_back(std::stoi(paras[i]));
}
auto ret = jobMgr->recoverJob(spaceName, adminClient_, jobIds);
auto ret = jobMgr_->recoverJob(spaceName, adminClient_, jobIds);
if (nebula::ok(ret)) {
result.recovered_job_num_ref() = nebula::value(ret);
} else {
Expand All @@ -142,5 +111,49 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
onFinished();
}

nebula::cpp2::ErrorCode AdminJobProcessor::addJobProcess(const cpp2::AdminJobReq& req,
cpp2::AdminJobResult& result) {
auto cmd = req.get_cmd();
auto paras = req.get_paras();

// All jobs here are the space level, so the last parameter is spaceName.
if (paras.empty()) {
LOG(INFO) << "Parameter should be not empty";
panda-sheep marked this conversation as resolved.
Show resolved Hide resolved
return nebula::cpp2::ErrorCode::E_INVALID_PARM;
}

// Check if space not exists
auto spaceName = paras.back();
auto spaceRet = getSpaceId(spaceName);
if (!nebula::ok(spaceRet)) {
auto retCode = nebula::error(spaceRet);
LOG(INFO) << "Get space failed, space name: " << spaceName
<< " error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}

// Check if job not exists
JobID jId = 0;
auto jobExist = jobMgr_->checkJobExist(cmd, paras, jId);
if (jobExist) {
LOG(INFO) << "Job has already exists: " << jId;
result.job_id_ref() = jId;
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

folly::SharedMutex::WriteHolder holder(LockUtils::lock());
auto jobId = autoIncrementId();
if (!nebula::ok(jobId)) {
return nebula::error(jobId);
}

JobDescription jobDesc(nebula::value(jobId), cmd, paras);
auto errorCode = jobMgr_->addJob(jobDesc, adminClient_);
Copy link
Contributor

@critical27 critical27 Mar 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The job manager will have one queue for one space later, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the later pr will do the following work:
Each space has a priority queue.
Jobs in the same space are executed serially according to priority.
Jobs in different spaces are executed in parallel according to the space.

if (errorCode == nebula::cpp2::ErrorCode::SUCCEEDED) {
result.job_id_ref() = nebula::value(jobId);
}
return errorCode;
}

} // namespace meta
} // namespace nebula
12 changes: 12 additions & 0 deletions src/meta/processors/job/AdminJobProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "common/stats/StatsManager.h"
#include "meta/processors/BaseProcessor.h"
#include "meta/processors/admin/AdminClient.h"
#include "meta/processors/job/JobManager.h"

namespace nebula {
namespace meta {
Expand All @@ -28,8 +29,19 @@ class AdminJobProcessor : public BaseProcessor<cpp2::AdminJobResp> {
AdminJobProcessor(kvstore::KVStore* kvstore, AdminClient* adminClient)
: BaseProcessor<cpp2::AdminJobResp>(kvstore), adminClient_(adminClient) {}

private:
/**
* @brief Check whether the parameters are legal, then construct the job and join the queue.
*
* @param req
* @param result
* @return nebula::cpp2::ErrorCode
*/
nebula::cpp2::ErrorCode addJobProcess(const cpp2::AdminJobReq& req, cpp2::AdminJobResult& result);

protected:
AdminClient* adminClient_{nullptr};
JobManager* jobMgr_{nullptr};
};

} // namespace meta
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/job/CompactJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ folly::Future<Status> CompactJobExecutor::executeInternal(HostAddr&& address,
space_,
std::move(address),
{},
std::move(parts),
concurrency_)
std::move(parts))
.then([pro = std::move(pro)](auto&& t) mutable {
CHECK(!t.hasException());
auto status = std::move(t).value();
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/job/FlushJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ folly::Future<Status> FlushJobExecutor::executeInternal(HostAddr&& address,
space_,
std::move(address),
{},
std::move(parts),
concurrency_)
std::move(parts))
.then([pro = std::move(pro)](auto&& t) mutable {
CHECK(!t.hasException());
auto status = std::move(t).value();
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ void JobManager::scheduleThread() {
}
}

// @return: true if all task dispatched, else false
bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) {
std::lock_guard<std::recursive_mutex> lk(muJobFinished_);
std::unique_ptr<JobExecutor> je =
Expand Down Expand Up @@ -285,7 +284,7 @@ nebula::cpp2::ErrorCode JobManager::saveTaskStatus(TaskDescription& td,
auto jobExec = JobExecutorFactory::createJobExecutor(optJobDesc, kvStore_, adminClient_);

if (!jobExec) {
LOG(INFO) << folly::sformat("createMetaJobExecutor failed(), jobId={}", jobId);
LOG(INFO) << folly::sformat("createJobExecutor failed(), jobId={}", jobId);
return nebula::cpp2::ErrorCode::E_TASK_REPORT_OUT_DATE;
}

Expand Down
9 changes: 7 additions & 2 deletions src/meta/processors/job/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,15 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable {
JobManager() = default;

void scheduleThread();
void scheduleThreadOld();

/**
* @brief Dispatch all tasks of one job
*
* @param jobDesc
* @param op
* @return true if all task dispatched, else false.
*/
bool runJobInternal(const JobDescription& jobDesc, JbOp op);
bool runJobInternalOld(const JobDescription& jobDesc);

ErrorOr<nebula::cpp2::ErrorCode, GraphSpaceID> getSpaceId(const std::string& name);

Expand Down
1 change: 0 additions & 1 deletion src/meta/processors/job/MetaJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ class MetaJobExecutor : public JobExecutor {
AdminClient* adminClient_{nullptr};
GraphSpaceID space_;
std::vector<std::string> paras_;
int32_t concurrency_{INT_MAX};
volatile bool stopped_{false};
std::mutex muInterrupt_;
std::condition_variable condInterrupt_;
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/job/RebuildEdgeJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ folly::Future<Status> RebuildEdgeJobExecutor::executeInternal(HostAddr&& address
space_,
std::move(address),
taskParameters_,
std::move(parts),
concurrency_)
std::move(parts))
.then([pro = std::move(pro)](auto&& t) mutable {
CHECK(!t.hasException());
auto status = std::move(t).value();
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/job/RebuildFTJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ folly::Future<Status> RebuildFTJobExecutor::executeInternal(HostAddr&& address,
space_,
std::move(address),
taskParameters_,
std::move(parts),
concurrency_)
std::move(parts))
.then([pro = std::move(pro)](auto&& t) mutable {
CHECK(!t.hasException());
auto status = std::move(t).value();
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/job/RebuildTagJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ folly::Future<Status> RebuildTagJobExecutor::executeInternal(HostAddr&& address,
space_,
std::move(address),
taskParameters_,
std::move(parts),
concurrency_)
std::move(parts))
.then([pro = std::move(pro)](auto&& t) mutable {
CHECK(!t.hasException());
auto status = std::move(t).value();
Expand Down
5 changes: 1 addition & 4 deletions src/meta/processors/job/SimpleConcurrentJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ SimpleConcurrentJobExecutor::SimpleConcurrentJobExecutor(JobID jobId,

bool SimpleConcurrentJobExecutor::check() {
auto parasNum = paras_.size();
return parasNum == 1 || parasNum == 2;
return parasNum == 1;
}

nebula::cpp2::ErrorCode SimpleConcurrentJobExecutor::prepare() {
Expand All @@ -37,9 +37,6 @@ nebula::cpp2::ErrorCode SimpleConcurrentJobExecutor::prepare() {
return nebula::error(errOrHost);
}

if (paras_.size() > 1) {
concurrency_ = std::atoi(paras_[0].c_str());
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

Expand Down
8 changes: 4 additions & 4 deletions src/meta/processors/job/StatsJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ nebula::cpp2::ErrorCode StatsJobExecutor::doRemove(const std::string& key) {
}

nebula::cpp2::ErrorCode StatsJobExecutor::prepare() {
auto spaceRet = getSpaceIdFromName(paras_[0]);
std::string spaceName = paras_.back();
auto spaceRet = getSpaceIdFromName(spaceName);
if (!nebula::ok(spaceRet)) {
LOG(INFO) << "Can't find the space: " << paras_[0];
LOG(INFO) << "Can't find the space: " << spaceName;
return nebula::error(spaceRet);
}
space_ = nebula::value(spaceRet);
Expand All @@ -68,8 +69,7 @@ folly::Future<Status> StatsJobExecutor::executeInternal(HostAddr&& address,
space_,
std::move(address),
{},
std::move(parts),
concurrency_)
std::move(parts))
.then([pro = std::move(pro)](auto&& t) mutable {
CHECK(!t.hasException());
auto status = std::move(t).value();
Expand Down
1 change: 0 additions & 1 deletion src/meta/processors/job/StorageJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ class StorageJobExecutor : public JobExecutor {
GraphSpaceID space_;
std::vector<std::string> paras_;
TargetHosts toHost_{TargetHosts::DEFAULT};
int32_t concurrency_{INT_MAX};
volatile bool stopped_{false};
std::mutex muInterrupt_;
std::condition_variable condInterrupt_;
Expand Down
Loading