Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor job manager part3 #4045

Merged
merged 4 commits into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1149,10 +1149,10 @@ PartitionID MetaClient::partId(int32_t numParts, const VertexID id) const {
}

folly::Future<StatusOr<cpp2::AdminJobResult>> MetaClient::submitJob(
cpp2::AdminJobOp op, cpp2::AdminCmd cmd, std::vector<std::string> paras) {
cpp2::JobOp op, cpp2::JobType type, std::vector<std::string> paras) {
cpp2::AdminJobReq req;
req.op_ref() = op;
req.cmd_ref() = cmd;
req.type_ref() = type;
req.paras_ref() = std::move(paras);
folly::Promise<StatusOr<cpp2::AdminJobResult>> promise;
auto future = promise.getFuture();
Expand Down
4 changes: 2 additions & 2 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ class MetaClient {
listener_ = nullptr;
}

folly::Future<StatusOr<cpp2::AdminJobResult>> submitJob(cpp2::AdminJobOp op,
cpp2::AdminCmd cmd,
folly::Future<StatusOr<cpp2::AdminJobResult>> submitJob(cpp2::JobOp op,
cpp2::JobType type,
std::vector<std::string> paras);

// Operations for parts
Expand Down
26 changes: 13 additions & 13 deletions src/graph/executor/admin/SubmitJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ folly::Future<Status> SubmitJobExecutor::execute() {

auto *sjNode = asNode<SubmitJob>(node());
auto jobOp = sjNode->jobOp();
auto cmd = sjNode->cmd();
auto jobType = sjNode->jobType();
auto params = sjNode->params();

return qctx()
->getMetaClient()
->submitJob(jobOp, cmd, params)
->submitJob(jobOp, jobType, params)
.via(runner())
.thenValue([jobOp, this](StatusOr<meta::cpp2::AdminJobResult> &&resp) {
SCOPED_TIMER(&execTime_);
Expand All @@ -40,10 +40,10 @@ folly::Future<Status> SubmitJobExecutor::execute() {
});
}

StatusOr<DataSet> SubmitJobExecutor::buildResult(meta::cpp2::AdminJobOp jobOp,
StatusOr<DataSet> SubmitJobExecutor::buildResult(meta::cpp2::JobOp jobOp,
meta::cpp2::AdminJobResult &&resp) {
switch (jobOp) {
case meta::cpp2::AdminJobOp::ADD: {
case meta::cpp2::JobOp::ADD: {
nebula::DataSet v({"New Job Id"});
DCHECK(resp.job_id_ref().has_value());
if (!resp.job_id_ref().has_value()) {
Expand All @@ -52,7 +52,7 @@ StatusOr<DataSet> SubmitJobExecutor::buildResult(meta::cpp2::AdminJobOp jobOp,
v.emplace_back(nebula::Row({*resp.job_id_ref()}));
return v;
}
case meta::cpp2::AdminJobOp::RECOVER: {
case meta::cpp2::JobOp::RECOVER: {
nebula::DataSet v({"Recovered job num"});
DCHECK(resp.recovered_job_num_ref().has_value());
if (!resp.recovered_job_num_ref().has_value()) {
Expand All @@ -61,7 +61,7 @@ StatusOr<DataSet> SubmitJobExecutor::buildResult(meta::cpp2::AdminJobOp jobOp,
v.emplace_back(nebula::Row({*resp.recovered_job_num_ref()}));
return v;
}
case meta::cpp2::AdminJobOp::SHOW: {
case meta::cpp2::JobOp::SHOW: {
DCHECK(resp.job_desc_ref().has_value());
if (!resp.job_desc_ref().has_value()) {
return Status::Error("Response unexpected.");
Expand All @@ -73,7 +73,7 @@ StatusOr<DataSet> SubmitJobExecutor::buildResult(meta::cpp2::AdminJobOp jobOp,
auto &jobDesc = *resp.job_desc_ref();
return buildShowResultData(jobDesc.front(), *resp.get_task_desc());
}
case meta::cpp2::AdminJobOp::SHOW_All: {
case meta::cpp2::JobOp::SHOW_All: {
nebula::DataSet v({"Job Id", "Command", "Status", "Start Time", "Stop Time"});
DCHECK(resp.job_desc_ref().has_value());
if (!resp.job_desc_ref().has_value()) {
Expand All @@ -83,15 +83,15 @@ StatusOr<DataSet> SubmitJobExecutor::buildResult(meta::cpp2::AdminJobOp jobOp,
for (const auto &jobDesc : jobsDesc) {
v.emplace_back(nebula::Row({
jobDesc.get_id(),
apache::thrift::util::enumNameSafe(jobDesc.get_cmd()),
apache::thrift::util::enumNameSafe(jobDesc.get_type()),
apache::thrift::util::enumNameSafe(jobDesc.get_status()),
convertJobTimestampToDateTime(jobDesc.get_start_time()),
convertJobTimestampToDateTime(jobDesc.get_stop_time()),
}));
}
return v;
}
case meta::cpp2::AdminJobOp::STOP: {
case meta::cpp2::JobOp::STOP: {
nebula::DataSet v({"Result"});
v.emplace_back(nebula::Row({"Job stopped"}));
return v;
Expand All @@ -109,16 +109,16 @@ Value SubmitJobExecutor::convertJobTimestampToDateTime(int64_t timestamp) {

nebula::DataSet SubmitJobExecutor::buildShowResultData(
const nebula::meta::cpp2::JobDesc &jd, const std::vector<nebula::meta::cpp2::TaskDesc> &td) {
if (jd.get_cmd() == meta::cpp2::AdminCmd::DATA_BALANCE ||
jd.get_cmd() == meta::cpp2::AdminCmd::ZONE_BALANCE) {
if (jd.get_type() == meta::cpp2::JobType::DATA_BALANCE ||
jd.get_type() == meta::cpp2::JobType::ZONE_BALANCE) {
nebula::DataSet v(
{"Job Id(spaceId:partId)", "Command(src->dst)", "Status", "Start Time", "Stop Time"});
const auto &paras = jd.get_paras();
size_t index = std::stoul(paras.back());
uint32_t total = paras.size() - index - 1, succeeded = 0, failed = 0, inProgress = 0,
invalid = 0;
v.emplace_back(Row({jd.get_id(),
apache::thrift::util::enumNameSafe(jd.get_cmd()),
apache::thrift::util::enumNameSafe(jd.get_type()),
apache::thrift::util::enumNameSafe(jd.get_status()),
convertJobTimestampToDateTime(jd.get_start_time()).toString(),
convertJobTimestampToDateTime(jd.get_stop_time()).toString()}));
Expand Down Expand Up @@ -155,7 +155,7 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData(
nebula::DataSet v({"Job Id(TaskId)", "Command(Dest)", "Status", "Start Time", "Stop Time"});
v.emplace_back(nebula::Row({
jd.get_id(),
apache::thrift::util::enumNameSafe(jd.get_cmd()),
apache::thrift::util::enumNameSafe(jd.get_type()),
apache::thrift::util::enumNameSafe(jd.get_status()),
convertJobTimestampToDateTime(jd.get_start_time()),
convertJobTimestampToDateTime(jd.get_stop_time()),
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/admin/SubmitJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class SubmitJobExecutor final : public Executor {

private:
FRIEND_TEST(JobTest, JobFinishTime);
StatusOr<DataSet> buildResult(meta::cpp2::AdminJobOp jobOp, meta::cpp2::AdminJobResult &&resp);
StatusOr<DataSet> buildResult(meta::cpp2::JobOp jobOp, meta::cpp2::AdminJobResult &&resp);
Value convertJobTimestampToDateTime(int64_t timestamp);
nebula::DataSet buildShowResultData(const nebula::meta::cpp2::JobDesc &jd,
const std::vector<nebula::meta::cpp2::TaskDesc> &td);
Expand Down
8 changes: 4 additions & 4 deletions src/graph/executor/test/JobTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ TEST_F(JobTest, JobFinishTime) {

auto qctx = std::make_unique<QueryContext>();
auto submitJob = SubmitJob::make(
qctx.get(), nullptr, meta::cpp2::AdminJobOp::SHOW, meta::cpp2::AdminCmd::UNKNOWN, {});
qctx.get(), nullptr, meta::cpp2::JobOp::SHOW, meta::cpp2::JobType::UNKNOWN, {});
auto submitJobExe = std::make_unique<SubmitJobExecutor>(submitJob, qctx.get());

auto status = submitJobExe->buildResult(meta::cpp2::AdminJobOp::SHOW, std::move(resp));
auto status = submitJobExe->buildResult(meta::cpp2::JobOp::SHOW, std::move(resp));
EXPECT_TRUE(status.ok());
auto result = std::move(status).value();
EXPECT_EQ(result.rows.size(), 2);
Expand All @@ -53,10 +53,10 @@ TEST_F(JobTest, JobFinishTime) {

auto qctx = std::make_unique<QueryContext>();
auto submitJob = SubmitJob::make(
qctx.get(), nullptr, meta::cpp2::AdminJobOp::SHOW_All, meta::cpp2::AdminCmd::UNKNOWN, {});
qctx.get(), nullptr, meta::cpp2::JobOp::SHOW_All, meta::cpp2::JobType::UNKNOWN, {});
auto submitJobExe = std::make_unique<SubmitJobExecutor>(submitJob, qctx.get());

auto status = submitJobExe->buildResult(meta::cpp2::AdminJobOp::SHOW_All, std::move(resp));
auto status = submitJobExe->buildResult(meta::cpp2::JobOp::SHOW_All, std::move(resp));
EXPECT_TRUE(status.ok());
auto result = std::move(status).value();
EXPECT_EQ(result.rows.size(), 1);
Expand Down
2 changes: 1 addition & 1 deletion src/graph/planner/plan/Admin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ std::unique_ptr<PlanNodeDescription> ListRoles::explain() const {
std::unique_ptr<PlanNodeDescription> SubmitJob::explain() const {
auto desc = SingleDependencyNode::explain();
addDescription("operation", apache::thrift::util::enumNameSafe(op_), desc.get());
addDescription("command", apache::thrift::util::enumNameSafe(cmd_), desc.get());
addDescription("command", apache::thrift::util::enumNameSafe(type_), desc.get());
addDescription("parameters", folly::toJson(util::toJson(params_)), desc.get());
return desc;
}
Expand Down
22 changes: 11 additions & 11 deletions src/graph/planner/plan/Admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -908,21 +908,21 @@ class SubmitJob final : public SingleDependencyNode {
public:
static SubmitJob* make(QueryContext* qctx,
PlanNode* dep,
meta::cpp2::AdminJobOp op,
meta::cpp2::AdminCmd cmd,
meta::cpp2::JobOp op,
meta::cpp2::JobType type,
const std::vector<std::string>& params) {
return qctx->objPool()->add(new SubmitJob(qctx, dep, op, cmd, params));
return qctx->objPool()->add(new SubmitJob(qctx, dep, op, type, params));
}

std::unique_ptr<PlanNodeDescription> explain() const override;

public:
meta::cpp2::AdminJobOp jobOp() const {
meta::cpp2::JobOp jobOp() const {
return op_;
}

meta::cpp2::AdminCmd cmd() const {
return cmd_;
meta::cpp2::JobType jobType() const {
return type_;
}

const std::vector<std::string>& params() const {
Expand All @@ -932,14 +932,14 @@ class SubmitJob final : public SingleDependencyNode {
private:
SubmitJob(QueryContext* qctx,
PlanNode* dep,
meta::cpp2::AdminJobOp op,
meta::cpp2::AdminCmd cmd,
meta::cpp2::JobOp op,
meta::cpp2::JobType type,
const std::vector<std::string>& params)
: SingleDependencyNode(qctx, Kind::kSubmitJob, dep), op_(op), cmd_(cmd), params_(params) {}
: SingleDependencyNode(qctx, Kind::kSubmitJob, dep), op_(op), type_(type), params_(params) {}

private:
meta::cpp2::AdminJobOp op_;
meta::cpp2::AdminCmd cmd_;
meta::cpp2::JobOp op_;
meta::cpp2::JobType type_;
const std::vector<std::string> params_;
};

Expand Down
16 changes: 8 additions & 8 deletions src/graph/validator/AdminJobValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ namespace nebula {
namespace graph {

Status AdminJobValidator::validateImpl() {
if (sentence_->getCmd() == meta::cpp2::AdminCmd::DATA_BALANCE ||
sentence_->getCmd() == meta::cpp2::AdminCmd::ZONE_BALANCE) {
if (sentence_->getJobType() == meta::cpp2::JobType::DATA_BALANCE ||
sentence_->getJobType() == meta::cpp2::JobType::ZONE_BALANCE) {
return Status::SemanticError("Data balance not support");
}
if (sentence_->getOp() == meta::cpp2::AdminJobOp::ADD) {
auto cmd = sentence_->getCmd();
if (sentence_->getOp() == meta::cpp2::JobOp::ADD) {
auto jobType = sentence_->getJobType();
if (requireSpace()) {
const auto &spaceInfo = vctx_->whichSpace();
auto spaceId = spaceInfo.id;
const auto &spaceName = spaceInfo.name;
sentence_->addPara(spaceName);

if (cmd == meta::cpp2::AdminCmd::REBUILD_TAG_INDEX ||
cmd == meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX) {
auto ret = cmd == meta::cpp2::AdminCmd::REBUILD_TAG_INDEX
if (jobType == meta::cpp2::JobType::REBUILD_TAG_INDEX ||
jobType == meta::cpp2::JobType::REBUILD_EDGE_INDEX) {
auto ret = jobType == meta::cpp2::JobType::REBUILD_TAG_INDEX
? qctx()->indexMng()->getTagIndexes(spaceId)
: qctx()->indexMng()->getEdgeIndexes(spaceId);
if (!ret.ok()) {
Expand Down Expand Up @@ -60,7 +60,7 @@ Status AdminJobValidator::validateImpl() {

Status AdminJobValidator::toPlan() {
auto *doNode = SubmitJob::make(
qctx_, nullptr, sentence_->getOp(), sentence_->getCmd(), sentence_->getParas());
qctx_, nullptr, sentence_->getOp(), sentence_->getJobType(), sentence_->getParas());
root_ = doNode;
tail_ = root_;
return Status::OK();
Expand Down
36 changes: 18 additions & 18 deletions src/graph/validator/AdminJobValidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,31 @@ class AdminJobValidator final : public Validator {

bool requireSpace() const {
switch (sentence_->getOp()) {
case meta::cpp2::AdminJobOp::ADD:
switch (sentence_->getCmd()) {
case meta::cpp2::JobOp::ADD:
switch (sentence_->getJobType()) {
// All jobs are space-level, except for the jobs that need to be refactored.
case meta::cpp2::AdminCmd::REBUILD_TAG_INDEX:
case meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX:
case meta::cpp2::AdminCmd::REBUILD_FULLTEXT_INDEX:
case meta::cpp2::AdminCmd::STATS:
case meta::cpp2::AdminCmd::COMPACT:
case meta::cpp2::AdminCmd::FLUSH:
case meta::cpp2::AdminCmd::DATA_BALANCE:
case meta::cpp2::AdminCmd::LEADER_BALANCE:
case meta::cpp2::AdminCmd::ZONE_BALANCE:
case meta::cpp2::JobType::REBUILD_TAG_INDEX:
case meta::cpp2::JobType::REBUILD_EDGE_INDEX:
case meta::cpp2::JobType::REBUILD_FULLTEXT_INDEX:
case meta::cpp2::JobType::STATS:
case meta::cpp2::JobType::COMPACT:
case meta::cpp2::JobType::FLUSH:
case meta::cpp2::JobType::DATA_BALANCE:
case meta::cpp2::JobType::LEADER_BALANCE:
case meta::cpp2::JobType::ZONE_BALANCE:
return true;
// TODO: download and ingest need to be refactored to use the rpc protocol.
// Currently they are using their own validator
case meta::cpp2::AdminCmd::DOWNLOAD:
case meta::cpp2::AdminCmd::INGEST:
case meta::cpp2::AdminCmd::UNKNOWN:
case meta::cpp2::JobType::DOWNLOAD:
case meta::cpp2::JobType::INGEST:
case meta::cpp2::JobType::UNKNOWN:
return false;
}
break;
case meta::cpp2::AdminJobOp::SHOW_All:
case meta::cpp2::AdminJobOp::SHOW:
case meta::cpp2::AdminJobOp::STOP:
case meta::cpp2::AdminJobOp::RECOVER:
case meta::cpp2::JobOp::SHOW_All:
case meta::cpp2::JobOp::SHOW:
case meta::cpp2::JobOp::STOP:
case meta::cpp2::JobOp::RECOVER:
return true;
}
return false;
Expand Down
18 changes: 9 additions & 9 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -220,21 +220,15 @@ struct AlterSpaceReq {
}

// Job related data structures
enum AdminJobOp {
enum JobOp {
ADD = 0x01,
SHOW_All = 0x02,
SHOW = 0x03,
STOP = 0x04,
RECOVER = 0x05,
} (cpp.enum_strict)

struct AdminJobReq {
1: AdminJobOp op,
2: AdminCmd cmd,
3: list<binary> paras,
}

enum AdminCmd {
enum JobType {
COMPACT = 0,
FLUSH = 1,
REBUILD_TAG_INDEX = 2,
Expand All @@ -249,6 +243,12 @@ enum AdminCmd {
UNKNOWN = 99,
} (cpp.enum_strict)

struct AdminJobReq {
1: JobOp op,
2: JobType type,
3: list<binary> paras,
}

enum JobStatus {
QUEUE = 0x01,
RUNNING = 0x02,
Expand All @@ -260,7 +260,7 @@ enum JobStatus {

struct JobDesc {
1: i32 id,
2: AdminCmd cmd,
2: JobType type,
3: list<string> paras,
4: JobStatus status,
5: i64 start_time,
Expand Down
2 changes: 1 addition & 1 deletion src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ struct ListClusterInfoReq {

struct AddTaskRequest {
// Task distributed to storage to execute, e.g. flush, compact, stats, etc.
1: meta.AdminCmd cmd
1: meta.JobType job_type
2: i32 job_id
3: i32 task_id
4: TaskPara para
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/admin/AdminClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ folly::Future<StatusOr<bool>> AdminClient::blockingWrites(const std::set<GraphSp
}

folly::Future<StatusOr<bool>> AdminClient::addTask(
cpp2::AdminCmd cmd,
cpp2::JobType type,
int32_t jobId,
int32_t taskId,
GraphSpaceID spaceId,
Expand All @@ -767,7 +767,7 @@ folly::Future<StatusOr<bool>> AdminClient::addTask(
auto adminAddr = Utils::getAdminAddrFromStoreAddr(host);

storage::cpp2::AddTaskRequest req;
req.cmd_ref() = cmd;
req.job_type_ref() = type;
req.job_id_ref() = jobId;
req.task_id_ref() = taskId;

Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/admin/AdminClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class AdminClient {
/**
* @brief Add storage admin task to given storage host
*
* @param cmd
* @param jobType
* @param jobId
* @param taskId
* @param spaceId
Expand All @@ -216,7 +216,7 @@ class AdminClient {
* @param parts
* @return folly::Future<StatusOr<bool>> Return true if succeed, else return an error status
*/
virtual folly::Future<StatusOr<bool>> addTask(cpp2::AdminCmd cmd,
virtual folly::Future<StatusOr<bool>> addTask(cpp2::JobType jobType,
int32_t jobId,
int32_t taskId,
GraphSpaceID spaceId,
Expand Down
Loading