diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index b1e7de2e63d..b9aff502ee2 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -1149,10 +1149,10 @@ PartitionID MetaClient::partId(int32_t numParts, const VertexID id) const { } folly::Future> MetaClient::submitJob( - cpp2::AdminJobOp op, cpp2::AdminCmd cmd, std::vector paras) { + cpp2::JobOp op, cpp2::JobType type, std::vector paras) { cpp2::AdminJobReq req; req.op_ref() = op; - req.cmd_ref() = cmd; + req.type_ref() = type; req.paras_ref() = std::move(paras); folly::Promise> promise; auto future = promise.getFuture(); diff --git a/src/clients/meta/MetaClient.h b/src/clients/meta/MetaClient.h index 23f667d7801..cb77bc2b77d 100644 --- a/src/clients/meta/MetaClient.h +++ b/src/clients/meta/MetaClient.h @@ -252,8 +252,8 @@ class MetaClient { listener_ = nullptr; } - folly::Future> submitJob(cpp2::AdminJobOp op, - cpp2::AdminCmd cmd, + folly::Future> submitJob(cpp2::JobOp op, + cpp2::JobType type, std::vector paras); // Operations for parts diff --git a/src/graph/executor/admin/SubmitJobExecutor.cpp b/src/graph/executor/admin/SubmitJobExecutor.cpp index 0d78361d60f..d75464c1c15 100644 --- a/src/graph/executor/admin/SubmitJobExecutor.cpp +++ b/src/graph/executor/admin/SubmitJobExecutor.cpp @@ -20,12 +20,12 @@ folly::Future SubmitJobExecutor::execute() { auto *sjNode = asNode(node()); auto jobOp = sjNode->jobOp(); - auto cmd = sjNode->cmd(); + auto jobType = sjNode->jobType(); auto params = sjNode->params(); return qctx() ->getMetaClient() - ->submitJob(jobOp, cmd, params) + ->submitJob(jobOp, jobType, params) .via(runner()) .thenValue([jobOp, this](StatusOr &&resp) { SCOPED_TIMER(&execTime_); @@ -40,10 +40,10 @@ folly::Future SubmitJobExecutor::execute() { }); } -StatusOr SubmitJobExecutor::buildResult(meta::cpp2::AdminJobOp jobOp, +StatusOr SubmitJobExecutor::buildResult(meta::cpp2::JobOp jobOp, meta::cpp2::AdminJobResult &&resp) { switch (jobOp) { - case meta::cpp2::AdminJobOp::ADD: { + case meta::cpp2::JobOp::ADD: { nebula::DataSet v({"New Job Id"}); DCHECK(resp.job_id_ref().has_value()); if (!resp.job_id_ref().has_value()) { @@ -52,7 +52,7 @@ StatusOr SubmitJobExecutor::buildResult(meta::cpp2::AdminJobOp jobOp, v.emplace_back(nebula::Row({*resp.job_id_ref()})); return v; } - case meta::cpp2::AdminJobOp::RECOVER: { + case meta::cpp2::JobOp::RECOVER: { nebula::DataSet v({"Recovered job num"}); DCHECK(resp.recovered_job_num_ref().has_value()); if (!resp.recovered_job_num_ref().has_value()) { @@ -61,7 +61,7 @@ StatusOr SubmitJobExecutor::buildResult(meta::cpp2::AdminJobOp jobOp, v.emplace_back(nebula::Row({*resp.recovered_job_num_ref()})); return v; } - case meta::cpp2::AdminJobOp::SHOW: { + case meta::cpp2::JobOp::SHOW: { DCHECK(resp.job_desc_ref().has_value()); if (!resp.job_desc_ref().has_value()) { return Status::Error("Response unexpected."); @@ -73,7 +73,7 @@ StatusOr SubmitJobExecutor::buildResult(meta::cpp2::AdminJobOp jobOp, auto &jobDesc = *resp.job_desc_ref(); return buildShowResultData(jobDesc.front(), *resp.get_task_desc()); } - case meta::cpp2::AdminJobOp::SHOW_All: { + case meta::cpp2::JobOp::SHOW_All: { nebula::DataSet v({"Job Id", "Command", "Status", "Start Time", "Stop Time"}); DCHECK(resp.job_desc_ref().has_value()); if (!resp.job_desc_ref().has_value()) { @@ -83,7 +83,7 @@ StatusOr SubmitJobExecutor::buildResult(meta::cpp2::AdminJobOp jobOp, for (const auto &jobDesc : jobsDesc) { v.emplace_back(nebula::Row({ jobDesc.get_id(), - apache::thrift::util::enumNameSafe(jobDesc.get_cmd()), + apache::thrift::util::enumNameSafe(jobDesc.get_type()), apache::thrift::util::enumNameSafe(jobDesc.get_status()), convertJobTimestampToDateTime(jobDesc.get_start_time()), convertJobTimestampToDateTime(jobDesc.get_stop_time()), @@ -91,7 +91,7 @@ StatusOr SubmitJobExecutor::buildResult(meta::cpp2::AdminJobOp jobOp, } return v; } - case meta::cpp2::AdminJobOp::STOP: { + case meta::cpp2::JobOp::STOP: { nebula::DataSet v({"Result"}); v.emplace_back(nebula::Row({"Job stopped"})); return v; @@ -109,8 +109,8 @@ Value SubmitJobExecutor::convertJobTimestampToDateTime(int64_t timestamp) { nebula::DataSet SubmitJobExecutor::buildShowResultData( const nebula::meta::cpp2::JobDesc &jd, const std::vector &td) { - if (jd.get_cmd() == meta::cpp2::AdminCmd::DATA_BALANCE || - jd.get_cmd() == meta::cpp2::AdminCmd::ZONE_BALANCE) { + if (jd.get_type() == meta::cpp2::JobType::DATA_BALANCE || + jd.get_type() == meta::cpp2::JobType::ZONE_BALANCE) { nebula::DataSet v( {"Job Id(spaceId:partId)", "Command(src->dst)", "Status", "Start Time", "Stop Time"}); const auto ¶s = jd.get_paras(); @@ -118,7 +118,7 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData( uint32_t total = paras.size() - index - 1, succeeded = 0, failed = 0, inProgress = 0, invalid = 0; v.emplace_back(Row({jd.get_id(), - apache::thrift::util::enumNameSafe(jd.get_cmd()), + apache::thrift::util::enumNameSafe(jd.get_type()), apache::thrift::util::enumNameSafe(jd.get_status()), convertJobTimestampToDateTime(jd.get_start_time()).toString(), convertJobTimestampToDateTime(jd.get_stop_time()).toString()})); @@ -155,7 +155,7 @@ nebula::DataSet SubmitJobExecutor::buildShowResultData( nebula::DataSet v({"Job Id(TaskId)", "Command(Dest)", "Status", "Start Time", "Stop Time"}); v.emplace_back(nebula::Row({ jd.get_id(), - apache::thrift::util::enumNameSafe(jd.get_cmd()), + apache::thrift::util::enumNameSafe(jd.get_type()), apache::thrift::util::enumNameSafe(jd.get_status()), convertJobTimestampToDateTime(jd.get_start_time()), convertJobTimestampToDateTime(jd.get_stop_time()), diff --git a/src/graph/executor/admin/SubmitJobExecutor.h b/src/graph/executor/admin/SubmitJobExecutor.h index 92bf2a98857..d299c9a7b0a 100644 --- a/src/graph/executor/admin/SubmitJobExecutor.h +++ b/src/graph/executor/admin/SubmitJobExecutor.h @@ -20,7 +20,7 @@ class SubmitJobExecutor final : public Executor { private: FRIEND_TEST(JobTest, JobFinishTime); - StatusOr buildResult(meta::cpp2::AdminJobOp jobOp, meta::cpp2::AdminJobResult &&resp); + StatusOr buildResult(meta::cpp2::JobOp jobOp, meta::cpp2::AdminJobResult &&resp); Value convertJobTimestampToDateTime(int64_t timestamp); nebula::DataSet buildShowResultData(const nebula::meta::cpp2::JobDesc &jd, const std::vector &td); diff --git a/src/graph/executor/test/JobTest.cpp b/src/graph/executor/test/JobTest.cpp index dbfa5bfab01..7dbd51e8c4e 100644 --- a/src/graph/executor/test/JobTest.cpp +++ b/src/graph/executor/test/JobTest.cpp @@ -30,10 +30,10 @@ TEST_F(JobTest, JobFinishTime) { auto qctx = std::make_unique(); auto submitJob = SubmitJob::make( - qctx.get(), nullptr, meta::cpp2::AdminJobOp::SHOW, meta::cpp2::AdminCmd::UNKNOWN, {}); + qctx.get(), nullptr, meta::cpp2::JobOp::SHOW, meta::cpp2::JobType::UNKNOWN, {}); auto submitJobExe = std::make_unique(submitJob, qctx.get()); - auto status = submitJobExe->buildResult(meta::cpp2::AdminJobOp::SHOW, std::move(resp)); + auto status = submitJobExe->buildResult(meta::cpp2::JobOp::SHOW, std::move(resp)); EXPECT_TRUE(status.ok()); auto result = std::move(status).value(); EXPECT_EQ(result.rows.size(), 2); @@ -53,10 +53,10 @@ TEST_F(JobTest, JobFinishTime) { auto qctx = std::make_unique(); auto submitJob = SubmitJob::make( - qctx.get(), nullptr, meta::cpp2::AdminJobOp::SHOW_All, meta::cpp2::AdminCmd::UNKNOWN, {}); + qctx.get(), nullptr, meta::cpp2::JobOp::SHOW_All, meta::cpp2::JobType::UNKNOWN, {}); auto submitJobExe = std::make_unique(submitJob, qctx.get()); - auto status = submitJobExe->buildResult(meta::cpp2::AdminJobOp::SHOW_All, std::move(resp)); + auto status = submitJobExe->buildResult(meta::cpp2::JobOp::SHOW_All, std::move(resp)); EXPECT_TRUE(status.ok()); auto result = std::move(status).value(); EXPECT_EQ(result.rows.size(), 1); diff --git a/src/graph/planner/plan/Admin.cpp b/src/graph/planner/plan/Admin.cpp index fe4592cdce9..119256e47ec 100644 --- a/src/graph/planner/plan/Admin.cpp +++ b/src/graph/planner/plan/Admin.cpp @@ -176,7 +176,7 @@ std::unique_ptr ListRoles::explain() const { std::unique_ptr SubmitJob::explain() const { auto desc = SingleDependencyNode::explain(); addDescription("operation", apache::thrift::util::enumNameSafe(op_), desc.get()); - addDescription("command", apache::thrift::util::enumNameSafe(cmd_), desc.get()); + addDescription("command", apache::thrift::util::enumNameSafe(type_), desc.get()); addDescription("parameters", folly::toJson(util::toJson(params_)), desc.get()); return desc; } diff --git a/src/graph/planner/plan/Admin.h b/src/graph/planner/plan/Admin.h index 2fd3526fc0b..cc5973286d9 100644 --- a/src/graph/planner/plan/Admin.h +++ b/src/graph/planner/plan/Admin.h @@ -908,21 +908,21 @@ class SubmitJob final : public SingleDependencyNode { public: static SubmitJob* make(QueryContext* qctx, PlanNode* dep, - meta::cpp2::AdminJobOp op, - meta::cpp2::AdminCmd cmd, + meta::cpp2::JobOp op, + meta::cpp2::JobType type, const std::vector& params) { - return qctx->objPool()->add(new SubmitJob(qctx, dep, op, cmd, params)); + return qctx->objPool()->add(new SubmitJob(qctx, dep, op, type, params)); } std::unique_ptr explain() const override; public: - meta::cpp2::AdminJobOp jobOp() const { + meta::cpp2::JobOp jobOp() const { return op_; } - meta::cpp2::AdminCmd cmd() const { - return cmd_; + meta::cpp2::JobType jobType() const { + return type_; } const std::vector& params() const { @@ -932,14 +932,14 @@ class SubmitJob final : public SingleDependencyNode { private: SubmitJob(QueryContext* qctx, PlanNode* dep, - meta::cpp2::AdminJobOp op, - meta::cpp2::AdminCmd cmd, + meta::cpp2::JobOp op, + meta::cpp2::JobType type, const std::vector& params) - : SingleDependencyNode(qctx, Kind::kSubmitJob, dep), op_(op), cmd_(cmd), params_(params) {} + : SingleDependencyNode(qctx, Kind::kSubmitJob, dep), op_(op), type_(type), params_(params) {} private: - meta::cpp2::AdminJobOp op_; - meta::cpp2::AdminCmd cmd_; + meta::cpp2::JobOp op_; + meta::cpp2::JobType type_; const std::vector params_; }; diff --git a/src/graph/validator/AdminJobValidator.cpp b/src/graph/validator/AdminJobValidator.cpp index cd489587ab0..533422d98d0 100644 --- a/src/graph/validator/AdminJobValidator.cpp +++ b/src/graph/validator/AdminJobValidator.cpp @@ -11,21 +11,21 @@ namespace nebula { namespace graph { Status AdminJobValidator::validateImpl() { - if (sentence_->getCmd() == meta::cpp2::AdminCmd::DATA_BALANCE || - sentence_->getCmd() == meta::cpp2::AdminCmd::ZONE_BALANCE) { + if (sentence_->getJobType() == meta::cpp2::JobType::DATA_BALANCE || + sentence_->getJobType() == meta::cpp2::JobType::ZONE_BALANCE) { return Status::SemanticError("Data balance not support"); } - if (sentence_->getOp() == meta::cpp2::AdminJobOp::ADD) { - auto cmd = sentence_->getCmd(); + if (sentence_->getOp() == meta::cpp2::JobOp::ADD) { + auto jobType = sentence_->getJobType(); if (requireSpace()) { const auto &spaceInfo = vctx_->whichSpace(); auto spaceId = spaceInfo.id; const auto &spaceName = spaceInfo.name; sentence_->addPara(spaceName); - if (cmd == meta::cpp2::AdminCmd::REBUILD_TAG_INDEX || - cmd == meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX) { - auto ret = cmd == meta::cpp2::AdminCmd::REBUILD_TAG_INDEX + if (jobType == meta::cpp2::JobType::REBUILD_TAG_INDEX || + jobType == meta::cpp2::JobType::REBUILD_EDGE_INDEX) { + auto ret = jobType == meta::cpp2::JobType::REBUILD_TAG_INDEX ? qctx()->indexMng()->getTagIndexes(spaceId) : qctx()->indexMng()->getEdgeIndexes(spaceId); if (!ret.ok()) { @@ -60,7 +60,7 @@ Status AdminJobValidator::validateImpl() { Status AdminJobValidator::toPlan() { auto *doNode = SubmitJob::make( - qctx_, nullptr, sentence_->getOp(), sentence_->getCmd(), sentence_->getParas()); + qctx_, nullptr, sentence_->getOp(), sentence_->getJobType(), sentence_->getParas()); root_ = doNode; tail_ = root_; return Status::OK(); diff --git a/src/graph/validator/AdminJobValidator.h b/src/graph/validator/AdminJobValidator.h index 31a931dda77..e6930fa9ff6 100644 --- a/src/graph/validator/AdminJobValidator.h +++ b/src/graph/validator/AdminJobValidator.h @@ -28,31 +28,31 @@ class AdminJobValidator final : public Validator { bool requireSpace() const { switch (sentence_->getOp()) { - case meta::cpp2::AdminJobOp::ADD: - switch (sentence_->getCmd()) { + case meta::cpp2::JobOp::ADD: + switch (sentence_->getJobType()) { // All jobs are space-level, except for the jobs that need to be refactored. - case meta::cpp2::AdminCmd::REBUILD_TAG_INDEX: - case meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX: - case meta::cpp2::AdminCmd::REBUILD_FULLTEXT_INDEX: - case meta::cpp2::AdminCmd::STATS: - case meta::cpp2::AdminCmd::COMPACT: - case meta::cpp2::AdminCmd::FLUSH: - case meta::cpp2::AdminCmd::DATA_BALANCE: - case meta::cpp2::AdminCmd::LEADER_BALANCE: - case meta::cpp2::AdminCmd::ZONE_BALANCE: + case meta::cpp2::JobType::REBUILD_TAG_INDEX: + case meta::cpp2::JobType::REBUILD_EDGE_INDEX: + case meta::cpp2::JobType::REBUILD_FULLTEXT_INDEX: + case meta::cpp2::JobType::STATS: + case meta::cpp2::JobType::COMPACT: + case meta::cpp2::JobType::FLUSH: + case meta::cpp2::JobType::DATA_BALANCE: + case meta::cpp2::JobType::LEADER_BALANCE: + case meta::cpp2::JobType::ZONE_BALANCE: return true; // TODO: download and ingest need to be refactored to use the rpc protocol. // Currently they are using their own validator - case meta::cpp2::AdminCmd::DOWNLOAD: - case meta::cpp2::AdminCmd::INGEST: - case meta::cpp2::AdminCmd::UNKNOWN: + case meta::cpp2::JobType::DOWNLOAD: + case meta::cpp2::JobType::INGEST: + case meta::cpp2::JobType::UNKNOWN: return false; } break; - case meta::cpp2::AdminJobOp::SHOW_All: - case meta::cpp2::AdminJobOp::SHOW: - case meta::cpp2::AdminJobOp::STOP: - case meta::cpp2::AdminJobOp::RECOVER: + case meta::cpp2::JobOp::SHOW_All: + case meta::cpp2::JobOp::SHOW: + case meta::cpp2::JobOp::STOP: + case meta::cpp2::JobOp::RECOVER: return true; } return false; diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index 0371b46312e..349b545fb5c 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -220,7 +220,7 @@ struct AlterSpaceReq { } // Job related data structures -enum AdminJobOp { +enum JobOp { ADD = 0x01, SHOW_All = 0x02, SHOW = 0x03, @@ -228,13 +228,7 @@ enum AdminJobOp { RECOVER = 0x05, } (cpp.enum_strict) -struct AdminJobReq { - 1: AdminJobOp op, - 2: AdminCmd cmd, - 3: list paras, -} - -enum AdminCmd { +enum JobType { COMPACT = 0, FLUSH = 1, REBUILD_TAG_INDEX = 2, @@ -249,6 +243,12 @@ enum AdminCmd { UNKNOWN = 99, } (cpp.enum_strict) +struct AdminJobReq { + 1: JobOp op, + 2: JobType type, + 3: list paras, +} + enum JobStatus { QUEUE = 0x01, RUNNING = 0x02, @@ -260,7 +260,7 @@ enum JobStatus { struct JobDesc { 1: i32 id, - 2: AdminCmd cmd, + 2: JobType type, 3: list paras, 4: JobStatus status, 5: i64 start_time, diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index 52c39df2e0a..2c12b1a84a7 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -817,7 +817,7 @@ struct ListClusterInfoReq { struct AddTaskRequest { // Task distributed to storage to execute, e.g. flush, compact, stats, etc. - 1: meta.AdminCmd cmd + 1: meta.JobType job_type 2: i32 job_id 3: i32 task_id 4: TaskPara para diff --git a/src/meta/processors/admin/AdminClient.cpp b/src/meta/processors/admin/AdminClient.cpp index c4e14620fd3..dbcfdd90a2d 100644 --- a/src/meta/processors/admin/AdminClient.cpp +++ b/src/meta/processors/admin/AdminClient.cpp @@ -755,7 +755,7 @@ folly::Future> AdminClient::blockingWrites(const std::set> AdminClient::addTask( - cpp2::AdminCmd cmd, + cpp2::JobType type, int32_t jobId, int32_t taskId, GraphSpaceID spaceId, @@ -767,7 +767,7 @@ folly::Future> AdminClient::addTask( auto adminAddr = Utils::getAdminAddrFromStoreAddr(host); storage::cpp2::AddTaskRequest req; - req.cmd_ref() = cmd; + req.job_type_ref() = type; req.job_id_ref() = jobId; req.task_id_ref() = taskId; diff --git a/src/meta/processors/admin/AdminClient.h b/src/meta/processors/admin/AdminClient.h index 42df7710760..82052514128 100644 --- a/src/meta/processors/admin/AdminClient.h +++ b/src/meta/processors/admin/AdminClient.h @@ -207,7 +207,7 @@ class AdminClient { /** * @brief Add storage admin task to given storage host * - * @param cmd + * @param jobType * @param jobId * @param taskId * @param spaceId @@ -216,7 +216,7 @@ class AdminClient { * @param parts * @return folly::Future> Return true if succeed, else return an error status */ - virtual folly::Future> addTask(cpp2::AdminCmd cmd, + virtual folly::Future> addTask(cpp2::JobType jobType, int32_t jobId, int32_t taskId, GraphSpaceID spaceId, diff --git a/src/meta/processors/job/AdminJobProcessor.cpp b/src/meta/processors/job/AdminJobProcessor.cpp index 13723a171ad..9e1e4c41026 100644 --- a/src/meta/processors/job/AdminJobProcessor.cpp +++ b/src/meta/processors/job/AdminJobProcessor.cpp @@ -15,8 +15,8 @@ namespace meta { void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { std::stringstream oss; oss << "op = " << apache::thrift::util::enumNameSafe(req.get_op()); - if (req.get_op() == nebula::meta::cpp2::AdminJobOp::ADD) { - oss << ", cmd = " << apache::thrift::util::enumNameSafe(req.get_cmd()); + if (req.get_op() == nebula::meta::cpp2::JobOp::ADD) { + oss << ", type = " << apache::thrift::util::enumNameSafe(req.get_type()); } oss << ", paras.size()=" << req.get_paras().size(); for (auto& p : req.get_paras()) { @@ -29,11 +29,11 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { jobMgr_ = JobManager::getInstance(); switch (req.get_op()) { - case nebula::meta::cpp2::AdminJobOp::ADD: { + case nebula::meta::cpp2::JobOp::ADD: { errorCode = addJobProcess(req, result); break; } - case nebula::meta::cpp2::AdminJobOp::SHOW_All: { + case nebula::meta::cpp2::JobOp::SHOW_All: { auto ret = jobMgr_->showJobs(req.get_paras().back()); if (nebula::ok(ret)) { result.job_desc_ref() = nebula::value(ret); @@ -42,7 +42,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { } break; } - case nebula::meta::cpp2::AdminJobOp::SHOW: { + case nebula::meta::cpp2::JobOp::SHOW: { static const size_t kShowArgsNum = 2; if (req.get_paras().size() != kShowArgsNum) { LOG(INFO) << "Parameter number not matched"; @@ -65,7 +65,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { } break; } - case nebula::meta::cpp2::AdminJobOp::STOP: { + case nebula::meta::cpp2::JobOp::STOP: { static const size_t kStopJobArgsNum = 2; if (req.get_paras().size() != kStopJobArgsNum) { LOG(INFO) << "Parameter number not matched"; @@ -81,7 +81,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { errorCode = jobMgr_->stopJob(iJob, req.get_paras().back()); break; } - case nebula::meta::cpp2::AdminJobOp::RECOVER: { + case nebula::meta::cpp2::JobOp::RECOVER: { const std::vector& paras = req.get_paras(); const std::string& spaceName = req.get_paras().back(); std::vector jobIds; @@ -113,7 +113,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { nebula::cpp2::ErrorCode AdminJobProcessor::addJobProcess(const cpp2::AdminJobReq& req, cpp2::AdminJobResult& result) { - auto cmd = req.get_cmd(); + auto type = req.get_type(); auto paras = req.get_paras(); // All jobs here are the space level, so the last parameter is spaceName. @@ -134,7 +134,7 @@ nebula::cpp2::ErrorCode AdminJobProcessor::addJobProcess(const cpp2::AdminJobReq // Check if job not exists JobID jId = 0; - auto jobExist = jobMgr_->checkJobExist(cmd, paras, jId); + auto jobExist = jobMgr_->checkJobExist(type, paras, jId); if (jobExist) { LOG(INFO) << "Job has already exists: " << jId; result.job_id_ref() = jId; @@ -147,7 +147,7 @@ nebula::cpp2::ErrorCode AdminJobProcessor::addJobProcess(const cpp2::AdminJobReq return nebula::error(jobId); } - JobDescription jobDesc(nebula::value(jobId), cmd, paras); + JobDescription jobDesc(nebula::value(jobId), type, paras); auto errorCode = jobMgr_->addJob(jobDesc, adminClient_); if (errorCode == nebula::cpp2::ErrorCode::SUCCEEDED) { result.job_id_ref() = nebula::value(jobId); diff --git a/src/meta/processors/job/CompactJobExecutor.cpp b/src/meta/processors/job/CompactJobExecutor.cpp index 28c8ba916dd..ef689d0663c 100644 --- a/src/meta/processors/job/CompactJobExecutor.cpp +++ b/src/meta/processors/job/CompactJobExecutor.cpp @@ -19,7 +19,7 @@ folly::Future CompactJobExecutor::executeInternal(HostAddr&& address, folly::Promise pro; auto f = pro.getFuture(); adminClient_ - ->addTask(cpp2::AdminCmd::COMPACT, + ->addTask(cpp2::JobType::COMPACT, jobId_, taskId_++, space_, diff --git a/src/meta/processors/job/FlushJobExecutor.cpp b/src/meta/processors/job/FlushJobExecutor.cpp index 5e10663f1b7..cbdfae4eb38 100644 --- a/src/meta/processors/job/FlushJobExecutor.cpp +++ b/src/meta/processors/job/FlushJobExecutor.cpp @@ -19,13 +19,8 @@ folly::Future FlushJobExecutor::executeInternal(HostAddr&& address, folly::Promise pro; auto f = pro.getFuture(); adminClient_ - ->addTask(cpp2::AdminCmd::FLUSH, - jobId_, - taskId_++, - space_, - std::move(address), - {}, - std::move(parts)) + ->addTask( + cpp2::JobType::FLUSH, jobId_, taskId_++, space_, std::move(address), {}, std::move(parts)) .then([pro = std::move(pro)](auto&& t) mutable { CHECK(!t.hasException()); auto status = std::move(t).value(); diff --git a/src/meta/processors/job/JobDescription.cpp b/src/meta/processors/job/JobDescription.cpp index b2b43fdf36c..c3c102b0b08 100644 --- a/src/meta/processors/job/JobDescription.cpp +++ b/src/meta/processors/job/JobDescription.cpp @@ -19,19 +19,19 @@ namespace nebula { namespace meta { using Status = cpp2::JobStatus; -using AdminCmd = cpp2::AdminCmd; +using JobType = cpp2::JobType; int32_t JobDescription::minDataVer_ = 1; int32_t JobDescription::currDataVer_ = 1; JobDescription::JobDescription(JobID id, - cpp2::AdminCmd cmd, + cpp2::JobType type, std::vector paras, Status status, int64_t startTime, int64_t stopTime) : id_(id), - cmd_(cmd), + type_(type), paras_(std::move(paras)), status_(status), startTime_(startTime), @@ -51,7 +51,7 @@ ErrorOr JobDescription::makeJobDescript } auto tup = parseVal(rawval); - auto cmd = std::get<0>(tup); + auto type = std::get<0>(tup); auto paras = std::get<1>(tup); for (auto p : paras) { LOG(INFO) << "p = " << p; @@ -59,7 +59,7 @@ ErrorOr JobDescription::makeJobDescript auto status = std::get<2>(tup); auto startTime = std::get<3>(tup); auto stopTime = std::get<4>(tup); - return JobDescription(key, cmd, paras, status, startTime, stopTime); + return JobDescription(key, type, paras, status, startTime, stopTime); } catch (std::exception& ex) { LOG(INFO) << ex.what(); } @@ -90,7 +90,7 @@ std::string JobDescription::jobVal() const { // use a big num to avoid possible conflict int32_t dataVersion = INT_MAX - currDataVer_; str.append(reinterpret_cast(&dataVersion), sizeof(dataVersion)) - .append(reinterpret_cast(&cmd_), sizeof(cmd_)); + .append(reinterpret_cast(&type_), sizeof(type_)); auto paraSize = paras_.size(); str.append(reinterpret_cast(¶Size), sizeof(size_t)); for (auto& para : paras_) { @@ -104,19 +104,19 @@ std::string JobDescription::jobVal() const { return str; } -std::tuple, Status, int64_t, int64_t> JobDescription::parseVal( +std::tuple, Status, int64_t, int64_t> JobDescription::parseVal( const folly::StringPiece& rawVal) { return decodeValV1(rawVal); } // old saved data may have different format // which means we have different decoder for each version -std::tuple, Status, int64_t, int64_t> -JobDescription::decodeValV1(const folly::StringPiece& rawVal) { +std::tuple, Status, int64_t, int64_t> JobDescription::decodeValV1( + const folly::StringPiece& rawVal) { size_t offset = sizeof(int32_t); - auto cmd = JobUtil::parseFixedVal(rawVal, offset); - offset += sizeof(cmd); + auto type = JobUtil::parseFixedVal(rawVal, offset); + offset += sizeof(type); std::vector paras = JobUtil::parseStrVector(rawVal, &offset); @@ -128,13 +128,13 @@ JobDescription::decodeValV1(const folly::StringPiece& rawVal) { auto tStop = JobUtil::parseFixedVal(rawVal, offset); - return std::make_tuple(cmd, paras, status, tStart, tStop); + return std::make_tuple(type, paras, status, tStart, tStop); } cpp2::JobDesc JobDescription::toJobDesc() { cpp2::JobDesc ret; ret.id_ref() = id_; - ret.cmd_ref() = cmd_; + ret.type_ref() = type_; ret.paras_ref() = paras_; ret.status_ref() = status_; ret.start_time_ref() = startTime_; diff --git a/src/meta/processors/job/JobDescription.h b/src/meta/processors/job/JobDescription.h index 0717feca89f..ef6c0448fe1 100644 --- a/src/meta/processors/job/JobDescription.h +++ b/src/meta/processors/job/JobDescription.h @@ -39,7 +39,7 @@ class JobDescription { public: JobDescription() = default; JobDescription(JobID id, - cpp2::AdminCmd cmd, + cpp2::JobType type, std::vector paras, Status status = Status::QUEUE, int64_t startTime = 0, @@ -64,8 +64,8 @@ class JobDescription { * * @return */ - cpp2::AdminCmd getCmd() const { - return cmd_; + cpp2::JobType getJobType() const { + return type_; } /** @@ -176,7 +176,7 @@ class JobDescription { * @param rawVal * @return */ - static std::tuple, Status, int64_t, int64_t> parseVal( + static std::tuple, Status, int64_t, int64_t> parseVal( const folly::StringPiece& rawVal); /** @@ -189,7 +189,8 @@ class JobDescription { static bool isJobKey(const folly::StringPiece& rawKey); bool operator==(const JobDescription& that) const { - return this->cmd_ == that.cmd_ && this->paras_ == that.paras_ && this->status_ == that.status_; + return this->type_ == that.type_ && this->paras_ == that.paras_ && + this->status_ == that.status_; } bool operator!=(const JobDescription& that) const { @@ -207,12 +208,12 @@ class JobDescription { * @param rawVal * @return */ - static std::tuple, Status, int64_t, int64_t> decodeValV1( + static std::tuple, Status, int64_t, int64_t> decodeValV1( const folly::StringPiece& rawVal); private: JobID id_; - cpp2::AdminCmd cmd_; + cpp2::JobType type_; std::vector paras_; Status status_; int64_t startTime_; diff --git a/src/meta/processors/job/JobExecutor.cpp b/src/meta/processors/job/JobExecutor.cpp index e44d2847610..1dcde13e974 100644 --- a/src/meta/processors/job/JobExecutor.cpp +++ b/src/meta/processors/job/JobExecutor.cpp @@ -47,32 +47,32 @@ std::unique_ptr JobExecutorFactory::createJobExecutor(const JobDesc kvstore::KVStore* store, AdminClient* client) { std::unique_ptr ret; - switch (jd.getCmd()) { - case cpp2::AdminCmd::COMPACT: + switch (jd.getJobType()) { + case cpp2::JobType::COMPACT: ret.reset(new CompactJobExecutor(jd.getJobId(), store, client, jd.getParas())); break; - case cpp2::AdminCmd::DATA_BALANCE: + case cpp2::JobType::DATA_BALANCE: ret.reset(new DataBalanceJobExecutor(jd, store, client, jd.getParas())); break; - case cpp2::AdminCmd::ZONE_BALANCE: + case cpp2::JobType::ZONE_BALANCE: ret.reset(new ZoneBalanceJobExecutor(jd, store, client, jd.getParas())); break; - case cpp2::AdminCmd::LEADER_BALANCE: + case cpp2::JobType::LEADER_BALANCE: ret.reset(new LeaderBalanceJobExecutor(jd.getJobId(), store, client, jd.getParas())); break; - case cpp2::AdminCmd::FLUSH: + case cpp2::JobType::FLUSH: ret.reset(new FlushJobExecutor(jd.getJobId(), store, client, jd.getParas())); break; - case cpp2::AdminCmd::REBUILD_TAG_INDEX: + case cpp2::JobType::REBUILD_TAG_INDEX: ret.reset(new RebuildTagJobExecutor(jd.getJobId(), store, client, jd.getParas())); break; - case cpp2::AdminCmd::REBUILD_EDGE_INDEX: + case cpp2::JobType::REBUILD_EDGE_INDEX: ret.reset(new RebuildEdgeJobExecutor(jd.getJobId(), store, client, jd.getParas())); break; - case cpp2::AdminCmd::REBUILD_FULLTEXT_INDEX: + case cpp2::JobType::REBUILD_FULLTEXT_INDEX: ret.reset(new RebuildFTJobExecutor(jd.getJobId(), store, client, jd.getParas())); break; - case cpp2::AdminCmd::STATS: + case cpp2::JobType::STATS: ret.reset(new StatsJobExecutor(jd.getJobId(), store, client, jd.getParas())); break; default: diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index 7aed996f81b..81b79d030e4 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -148,7 +148,8 @@ bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) { JobExecutor* jobExec = je.get(); runningJobs_.emplace(jobDesc.getJobId(), std::move(je)); if (jobExec == nullptr) { - LOG(INFO) << "unreconized job cmd " << apache::thrift::util::enumNameSafe(jobDesc.getCmd()); + LOG(INFO) << "unreconized job type " + << apache::thrift::util::enumNameSafe(jobDesc.getJobType()); return false; } @@ -393,7 +394,7 @@ nebula::cpp2::ErrorCode JobManager::addJob(const JobDescription& jobDesc, AdminC auto rc = save(jobDesc.jobKey(), jobDesc.jobVal()); if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) { auto jobId = jobDesc.getJobId(); - enqueue(JbOp::ADD, jobId, jobDesc.getCmd()); + enqueue(JbOp::ADD, jobId, jobDesc.getJobType()); // Add job to jobMap inFlightJobs_.emplace(jobId, jobDesc); } else { @@ -420,8 +421,8 @@ bool JobManager::try_dequeue(std::pair& opJobId) { return false; } -void JobManager::enqueue(const JbOp& op, const JobID& jobId, const cpp2::AdminCmd& cmd) { - if (cmd == cpp2::AdminCmd::STATS) { +void JobManager::enqueue(const JbOp& op, const JobID& jobId, const cpp2::JobType& jobType) { + if (jobType == cpp2::JobType::STATS) { highPriorityQueue_->enqueue(std::make_pair(op, jobId)); } else { lowPriorityQueue_->enqueue(std::make_pair(op, jobId)); @@ -515,10 +516,10 @@ nebula::cpp2::ErrorCode JobManager::removeExpiredJobs( return ret; } -bool JobManager::checkJobExist(const cpp2::AdminCmd& cmd, +bool JobManager::checkJobExist(const cpp2::JobType& jobType, const std::vector& paras, JobID& iJob) { - JobDescription jobDesc(0, cmd, paras); + JobDescription jobDesc(0, jobType, paras); auto it = inFlightJobs_.begin(); while (it != inFlightJobs_.end()) { if (it->second == jobDesc) { @@ -562,8 +563,8 @@ JobManager::showJob(JobID iJob, const std::string& spaceName) { ret.second.emplace_back(td.toTaskDesc()); } } - if (ret.first.get_cmd() == meta::cpp2::AdminCmd::DATA_BALANCE || - ret.first.get_cmd() == meta::cpp2::AdminCmd::ZONE_BALANCE) { + if (ret.first.get_type() == meta::cpp2::JobType::DATA_BALANCE || + ret.first.get_type() == meta::cpp2::JobType::ZONE_BALANCE) { auto res = BalancePlan::show(iJob, kvStore_, adminClient_); if (ok(res)) { std::vector thriftTasks = value(res); @@ -648,11 +649,11 @@ ErrorOr JobManager::recoverJob( optJob.getStatus() == cpp2::JobStatus::STOPPED))) { // Check if the job exists JobID jId = 0; - auto jobExist = checkJobExist(optJob.getCmd(), optJob.getParas(), jId); + auto jobExist = checkJobExist(optJob.getJobType(), optJob.getParas(), jId); if (!jobExist) { auto jobId = optJob.getJobId(); - enqueue(JbOp::RECOVER, jobId, optJob.getCmd()); + enqueue(JbOp::RECOVER, jobId, optJob.getJobType()); inFlightJobs_.emplace(jobId, optJob); ++recoveredJobNum; } @@ -708,8 +709,9 @@ ErrorOr JobManager::checkIndexJobRunning() { if (!isRunningJob(jobDesc)) { continue; } - auto cmd = jobDesc.getCmd(); - if (cmd == cpp2::AdminCmd::REBUILD_TAG_INDEX || cmd == cpp2::AdminCmd::REBUILD_EDGE_INDEX) { + auto jobType = jobDesc.getJobType(); + if (jobType == cpp2::JobType::REBUILD_TAG_INDEX || + jobType == cpp2::JobType::REBUILD_EDGE_INDEX) { return true; } } diff --git a/src/meta/processors/job/JobManager.h b/src/meta/processors/job/JobManager.h index 16d0ccc61d4..837d6dbc2e1 100644 --- a/src/meta/processors/job/JobManager.h +++ b/src/meta/processors/job/JobManager.h @@ -87,12 +87,12 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable { /** * @brief The same job is in jobMap * - * @param cmd + * @param type * @param paras * @param iJob * @return */ - bool checkJobExist(const cpp2::AdminCmd& cmd, const std::vector& paras, JobID& iJob); + bool checkJobExist(const cpp2::JobType& type, const std::vector& paras, JobID& iJob); /** * @brief Load all jobs of the space from kvStore and convert to cpp2::JobDesc @@ -174,9 +174,9 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable { * * @param op Recover a job or add a new one * @param jobId Id of the job - * @param cmd Cmd type of the job + * @param jobType Type of the job */ - void enqueue(const JbOp& op, const JobID& jobId, const cpp2::AdminCmd& cmd); + void enqueue(const JbOp& op, const JobID& jobId, const cpp2::JobType& jobType); /** * @brief Check if there is a rebuild_tag_index or rebuild_edge_index running @@ -257,7 +257,7 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable { private: // Todo(pandasheep) // When folly is upgraded, PriorityUMPSCQueueSet can be used - // Use two queues to simulate priority queue, Divide by job cmd + // Use two queues to simulate priority queue, Divide by job type std::unique_ptr, true>> lowPriorityQueue_; std::unique_ptr, true>> highPriorityQueue_; std::map> runningJobs_; diff --git a/src/meta/processors/job/ListEdgeIndexStatusProcessor.cpp b/src/meta/processors/job/ListEdgeIndexStatusProcessor.cpp index 62796909f07..4a5a35eac0d 100644 --- a/src/meta/processors/job/ListEdgeIndexStatusProcessor.cpp +++ b/src/meta/processors/job/ListEdgeIndexStatusProcessor.cpp @@ -30,7 +30,7 @@ void ListEdgeIndexStatusProcessor::process(const cpp2::ListIndexStatusReq& req) } auto optJob = nebula::value(optJobRet); auto jobDesc = optJob.toJobDesc(); - if (jobDesc.get_cmd() == meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX) { + if (jobDesc.get_type() == meta::cpp2::JobType::REBUILD_EDGE_INDEX) { auto paras = jobDesc.get_paras(); DCHECK_GE(paras.size(), 1); auto spaceName = paras.back(); diff --git a/src/meta/processors/job/ListTagIndexStatusProcessor.cpp b/src/meta/processors/job/ListTagIndexStatusProcessor.cpp index d7f184a5690..ea003fd1bb1 100644 --- a/src/meta/processors/job/ListTagIndexStatusProcessor.cpp +++ b/src/meta/processors/job/ListTagIndexStatusProcessor.cpp @@ -31,7 +31,7 @@ void ListTagIndexStatusProcessor::process(const cpp2::ListIndexStatusReq& req) { auto optJob = nebula::value(optJobRet); auto jobDesc = optJob.toJobDesc(); - if (jobDesc.get_cmd() == cpp2::AdminCmd::REBUILD_TAG_INDEX) { + if (jobDesc.get_type() == cpp2::JobType::REBUILD_TAG_INDEX) { auto paras = jobDesc.get_paras(); DCHECK_GE(paras.size(), 1); auto spaceName = paras.back(); diff --git a/src/meta/processors/job/RebuildEdgeJobExecutor.cpp b/src/meta/processors/job/RebuildEdgeJobExecutor.cpp index c37c169f328..ceb1911a16b 100644 --- a/src/meta/processors/job/RebuildEdgeJobExecutor.cpp +++ b/src/meta/processors/job/RebuildEdgeJobExecutor.cpp @@ -13,7 +13,7 @@ folly::Future RebuildEdgeJobExecutor::executeInternal(HostAddr&& address folly::Promise pro; auto f = pro.getFuture(); adminClient_ - ->addTask(cpp2::AdminCmd::REBUILD_EDGE_INDEX, + ->addTask(cpp2::JobType::REBUILD_EDGE_INDEX, jobId_, taskId_++, space_, diff --git a/src/meta/processors/job/RebuildFTJobExecutor.cpp b/src/meta/processors/job/RebuildFTJobExecutor.cpp index 13ce778f758..6e9f9071a77 100644 --- a/src/meta/processors/job/RebuildFTJobExecutor.cpp +++ b/src/meta/processors/job/RebuildFTJobExecutor.cpp @@ -13,7 +13,7 @@ folly::Future RebuildFTJobExecutor::executeInternal(HostAddr&& address, folly::Promise pro; auto f = pro.getFuture(); adminClient_ - ->addTask(cpp2::AdminCmd::REBUILD_FULLTEXT_INDEX, + ->addTask(cpp2::JobType::REBUILD_FULLTEXT_INDEX, jobId_, taskId_++, space_, diff --git a/src/meta/processors/job/RebuildTagJobExecutor.cpp b/src/meta/processors/job/RebuildTagJobExecutor.cpp index 8a91b8e20a6..53ad89fb94e 100644 --- a/src/meta/processors/job/RebuildTagJobExecutor.cpp +++ b/src/meta/processors/job/RebuildTagJobExecutor.cpp @@ -13,7 +13,7 @@ folly::Future RebuildTagJobExecutor::executeInternal(HostAddr&& address, folly::Promise pro; auto f = pro.getFuture(); adminClient_ - ->addTask(cpp2::AdminCmd::REBUILD_TAG_INDEX, + ->addTask(cpp2::JobType::REBUILD_TAG_INDEX, jobId_, taskId_++, space_, diff --git a/src/meta/processors/job/StatsJobExecutor.cpp b/src/meta/processors/job/StatsJobExecutor.cpp index d012b462be6..ee72d283c41 100644 --- a/src/meta/processors/job/StatsJobExecutor.cpp +++ b/src/meta/processors/job/StatsJobExecutor.cpp @@ -63,13 +63,8 @@ folly::Future StatsJobExecutor::executeInternal(HostAddr&& address, folly::Promise pro; auto f = pro.getFuture(); adminClient_ - ->addTask(cpp2::AdminCmd::STATS, - jobId_, - taskId_++, - space_, - std::move(address), - {}, - std::move(parts)) + ->addTask( + cpp2::JobType::STATS, jobId_, taskId_++, space_, std::move(address), {}, std::move(parts)) .then([pro = std::move(pro)](auto&& t) mutable { CHECK(!t.hasException()); auto status = std::move(t).value(); diff --git a/src/meta/test/BalancerTest.cpp b/src/meta/test/BalancerTest.cpp index 280f7d4bb5b..2e3434ab0f7 100644 --- a/src/meta/test/BalancerTest.cpp +++ b/src/meta/test/BalancerTest.cpp @@ -466,7 +466,7 @@ TEST(BalanceTest, DispatchTasksTest) { { FLAGS_task_concurrency = 10; JobDescription jd( - testJobId.fetch_add(1, std::memory_order_relaxed), cpp2::AdminCmd::DATA_BALANCE, {}); + testJobId.fetch_add(1, std::memory_order_relaxed), cpp2::JobType::DATA_BALANCE, {}); BalancePlan plan(jd, nullptr, nullptr); for (int i = 0; i < 20; i++) { BalanceTask task(0, @@ -487,7 +487,7 @@ TEST(BalanceTest, DispatchTasksTest) { { FLAGS_task_concurrency = 10; JobDescription jd( - testJobId.fetch_add(1, std::memory_order_relaxed), cpp2::AdminCmd::DATA_BALANCE, {}); + testJobId.fetch_add(1, std::memory_order_relaxed), cpp2::JobType::DATA_BALANCE, {}); BalancePlan plan(jd, nullptr, nullptr); for (int i = 0; i < 5; i++) { BalanceTask task(0, @@ -508,7 +508,7 @@ TEST(BalanceTest, DispatchTasksTest) { { FLAGS_task_concurrency = 20; JobDescription jd( - testJobId.fetch_add(1, std::memory_order_relaxed), cpp2::AdminCmd::DATA_BALANCE, {}); + testJobId.fetch_add(1, std::memory_order_relaxed), cpp2::JobType::DATA_BALANCE, {}); BalancePlan plan(jd, nullptr, nullptr); for (int i = 0; i < 5; i++) { BalanceTask task(0, @@ -558,7 +558,7 @@ TEST(BalanceTest, BalancePlanTest) { LOG(INFO) << "Test with all tasks succeeded, only one bucket!"; NiceMock client; JobDescription jd( - testJobId.fetch_add(1, std::memory_order_relaxed), cpp2::AdminCmd::DATA_BALANCE, {}); + testJobId.fetch_add(1, std::memory_order_relaxed), cpp2::JobType::DATA_BALANCE, {}); BalancePlan plan(jd, kv, &client); TestUtils::createSomeHosts(kv, hosts); TestUtils::registerHB(kv, hosts); @@ -590,7 +590,7 @@ TEST(BalanceTest, BalancePlanTest) { LOG(INFO) << "Test with all tasks succeeded, 10 buckets!"; NiceMock client; JobDescription jd( - testJobId.fetch_add(1, std::memory_order_relaxed), cpp2::AdminCmd::DATA_BALANCE, {}); + testJobId.fetch_add(1, std::memory_order_relaxed), cpp2::JobType::DATA_BALANCE, {}); BalancePlan plan(jd, kv, &client); TestUtils::registerHB(kv, hosts); @@ -622,7 +622,7 @@ TEST(BalanceTest, BalancePlanTest) { { LOG(INFO) << "Test with one task failed, 10 buckets"; JobDescription jd( - testJobId.fetch_add(1, std::memory_order_relaxed), cpp2::AdminCmd::DATA_BALANCE, {}); + testJobId.fetch_add(1, std::memory_order_relaxed), cpp2::JobType::DATA_BALANCE, {}); BalancePlan plan(jd, kv, nullptr); NiceMock client1, client2; { @@ -735,8 +735,8 @@ void verifyZonePartNum(kvstore::KVStore* kv, EXPECT_EQ(zoneNum, zones); } -JobDescription makeJobDescription(kvstore::KVStore* kv, cpp2::AdminCmd cmd) { - JobDescription jd(testJobId.fetch_add(1, std::memory_order_relaxed), cmd, {}); +JobDescription makeJobDescription(kvstore::KVStore* kv, cpp2::JobType jobType) { + JobDescription jd(testJobId.fetch_add(1, std::memory_order_relaxed), jobType, {}); std::vector data; data.emplace_back(jd.jobKey(), jd.jobVal()); folly::Baton baton; @@ -759,7 +759,7 @@ TEST(BalanceTest, NormalZoneTest) { DefaultValue>::SetFactory( [] { return folly::Future(Status::OK()); }); NiceMock client; - JobDescription jd = makeJobDescription(kv, cpp2::AdminCmd::ZONE_BALANCE); + JobDescription jd = makeJobDescription(kv, cpp2::JobType::ZONE_BALANCE); ZoneBalanceJobExecutor balancer(jd, kv, &client, {}); balancer.spaceInfo_.loadInfo(1, kv); folly::Baton baton; @@ -795,7 +795,7 @@ TEST(BalanceTest, NormalDataTest) { DefaultValue>::SetFactory( [] { return folly::Future(Status::OK()); }); NiceMock client; - JobDescription jd = makeJobDescription(kv, cpp2::AdminCmd::DATA_BALANCE); + JobDescription jd = makeJobDescription(kv, cpp2::JobType::DATA_BALANCE); DataBalanceJobExecutor balancer(jd, kv, &client, {}); balancer.spaceInfo_.loadInfo(1, kv); auto ret = balancer.executeInternal(); @@ -831,7 +831,7 @@ TEST(BalanceTest, RecoveryTest) { .WillOnce(Return(ByMove(folly::Future(Status::Error("catch up failed"))))) .WillOnce(Return(ByMove(folly::Future(Status::Error("catch up failed"))))); - JobDescription jd = makeJobDescription(kv, cpp2::AdminCmd::DATA_BALANCE); + JobDescription jd = makeJobDescription(kv, cpp2::JobType::DATA_BALANCE); DataBalanceJobExecutor balancer(jd, kv, &client, {}); balancer.spaceInfo_.loadInfo(1, kv); balancer.lostHosts_ = {{"127.0.0.1", 1}, {"127.0.0.1", 8}}; @@ -897,7 +897,7 @@ TEST(BalanceTest, StopPlanTest) { .WillOnce( Return(ByMove(folly::makeFuture(Status::OK()).delayed(std::chrono::seconds(3))))); FLAGS_task_concurrency = 8; - JobDescription jd = makeJobDescription(kv, cpp2::AdminCmd::DATA_BALANCE); + JobDescription jd = makeJobDescription(kv, cpp2::JobType::DATA_BALANCE); ZoneBalanceJobExecutor balancer(jd, kv, &delayClient, {}); balancer.spaceInfo_.loadInfo(1, kv); balancer.lostZones_ = {"4", "5"}; diff --git a/src/meta/test/GetStatsTest.cpp b/src/meta/test/GetStatsTest.cpp index 34c32b584ed..36a6c8a2d69 100644 --- a/src/meta/test/GetStatsTest.cpp +++ b/src/meta/test/GetStatsTest.cpp @@ -125,7 +125,7 @@ TEST_F(GetStatsTest, StatsJob) { TestUtils::assembleSpace(kv_.get(), 1, 1); GraphSpaceID spaceId = 1; std::vector paras{"test_space"}; - JobDescription statsJob(12, cpp2::AdminCmd::STATS, paras); + JobDescription statsJob(12, cpp2::JobType::STATS, paras); NiceMock adminClient; jobMgr->adminClient_ = &adminClient; auto rc = jobMgr->save(statsJob.jobKey(), statsJob.jobVal()); @@ -217,7 +217,7 @@ TEST_F(GetStatsTest, StatsJob) { // Execute new stats job in same space. std::vector paras1{"test_space"}; - JobDescription statsJob2(13, cpp2::AdminCmd::STATS, paras1); + JobDescription statsJob2(13, cpp2::JobType::STATS, paras1); auto rc2 = jobMgr->save(statsJob2.jobKey(), statsJob2.jobVal()); ASSERT_EQ(rc2, nebula::cpp2::ErrorCode::SUCCEEDED); { @@ -377,7 +377,7 @@ TEST_F(GetStatsTest, MockSingleMachineTest) { // add stats job1 JobID jobId1 = 1; std::vector paras{"test_space"}; - JobDescription job1(jobId1, cpp2::AdminCmd::STATS, paras); + JobDescription job1(jobId1, cpp2::JobType::STATS, paras); jobMgr->addJob(job1, &adminClient); JobCallBack cb1(jobMgr, jobId1, 0, 100); @@ -425,7 +425,7 @@ TEST_F(GetStatsTest, MockSingleMachineTest) { // add stats job2 of same space JobID jobId2 = 2; - JobDescription job2(jobId2, cpp2::AdminCmd::STATS, paras); + JobDescription job2(jobId2, cpp2::JobType::STATS, paras); jobMgr->addJob(job2, &adminClient); // check job result @@ -495,7 +495,7 @@ TEST_F(GetStatsTest, MockMultiMachineTest) { // add stats job JobID jobId = 1; std::vector paras{"test_space"}; - JobDescription job(jobId, cpp2::AdminCmd::STATS, paras); + JobDescription job(jobId, cpp2::JobType::STATS, paras); jobMgr->addJob(job, &adminClient); JobCallBack cb1(jobMgr, jobId, 0, 100); diff --git a/src/meta/test/JobManagerTest.cpp b/src/meta/test/JobManagerTest.cpp index 1963f176299..1429a9c67ce 100644 --- a/src/meta/test/JobManagerTest.cpp +++ b/src/meta/test/JobManagerTest.cpp @@ -76,7 +76,7 @@ class JobManagerTest : public ::testing::Test { TEST_F(JobManagerTest, AddJob) { std::unique_ptr> jobMgr = getJobManager(); std::vector paras{"test"}; - JobDescription job(1, cpp2::AdminCmd::COMPACT, paras); + JobDescription job(1, cpp2::JobType::COMPACT, paras); auto rc = jobMgr->addJob(job, adminClient_.get()); ASSERT_EQ(rc, nebula::cpp2::ErrorCode::SUCCEEDED); } @@ -87,7 +87,7 @@ TEST_F(JobManagerTest, AddRebuildTagIndexJob) { jobMgr->status_ = JobManager::JbmgrStatus::STOPPED; jobMgr->bgThread_.join(); std::vector paras{"tag_index_name", "test_space"}; - JobDescription job(11, cpp2::AdminCmd::REBUILD_TAG_INDEX, paras); + JobDescription job(11, cpp2::JobType::REBUILD_TAG_INDEX, paras); auto rc = jobMgr->addJob(job, adminClient_.get()); ASSERT_EQ(rc, nebula::cpp2::ErrorCode::SUCCEEDED); auto result = jobMgr->runJobInternal(job, JobManager::JbOp::ADD); @@ -100,7 +100,7 @@ TEST_F(JobManagerTest, AddRebuildEdgeIndexJob) { jobMgr->status_ = JobManager::JbmgrStatus::STOPPED; jobMgr->bgThread_.join(); std::vector paras{"edge_index_name", "test_space"}; - JobDescription job(11, cpp2::AdminCmd::REBUILD_EDGE_INDEX, paras); + JobDescription job(11, cpp2::JobType::REBUILD_EDGE_INDEX, paras); auto rc = jobMgr->addJob(job, adminClient_.get()); ASSERT_EQ(rc, nebula::cpp2::ErrorCode::SUCCEEDED); auto result = jobMgr->runJobInternal(job, JobManager::JbOp::ADD); @@ -113,7 +113,7 @@ TEST_F(JobManagerTest, StatsJob) { jobMgr->status_ = JobManager::JbmgrStatus::STOPPED; jobMgr->bgThread_.join(); std::vector paras{"test_space"}; - JobDescription job(12, cpp2::AdminCmd::STATS, paras); + JobDescription job(12, cpp2::JobType::STATS, paras); auto rc = jobMgr->addJob(job, adminClient_.get()); ASSERT_EQ(rc, nebula::cpp2::ErrorCode::SUCCEEDED); auto result = jobMgr->runJobInternal(job, JobManager::JbOp::ADD); @@ -137,12 +137,12 @@ TEST_F(JobManagerTest, JobPriority) { ASSERT_EQ(0, jobMgr->jobSize()); std::vector paras{"test"}; - JobDescription job1(13, cpp2::AdminCmd::COMPACT, paras); + JobDescription job1(13, cpp2::JobType::COMPACT, paras); auto rc1 = jobMgr->addJob(job1, adminClient_.get()); ASSERT_EQ(rc1, nebula::cpp2::ErrorCode::SUCCEEDED); std::vector paras1{"test_space"}; - JobDescription job2(14, cpp2::AdminCmd::STATS, paras1); + JobDescription job2(14, cpp2::JobType::STATS, paras1); auto rc2 = jobMgr->addJob(job2, adminClient_.get()); ASSERT_EQ(rc2, nebula::cpp2::ErrorCode::SUCCEEDED); @@ -171,28 +171,28 @@ TEST_F(JobManagerTest, JobDeduplication) { ASSERT_EQ(0, jobMgr->jobSize()); std::vector paras{"test"}; - JobDescription job1(15, cpp2::AdminCmd::COMPACT, paras); + JobDescription job1(15, cpp2::JobType::COMPACT, paras); auto rc1 = jobMgr->addJob(job1, adminClient_.get()); ASSERT_EQ(rc1, nebula::cpp2::ErrorCode::SUCCEEDED); std::vector paras1{"test_space"}; - JobDescription job2(16, cpp2::AdminCmd::STATS, paras1); + JobDescription job2(16, cpp2::JobType::STATS, paras1); auto rc2 = jobMgr->addJob(job2, adminClient_.get()); ASSERT_EQ(rc2, nebula::cpp2::ErrorCode::SUCCEEDED); ASSERT_EQ(2, jobMgr->jobSize()); - JobDescription job3(17, cpp2::AdminCmd::STATS, paras1); + JobDescription job3(17, cpp2::JobType::STATS, paras1); JobID jId3 = 0; - auto jobExist = jobMgr->checkJobExist(job3.getCmd(), job3.getParas(), jId3); + auto jobExist = jobMgr->checkJobExist(job3.getJobType(), job3.getParas(), jId3); if (!jobExist) { auto rc3 = jobMgr->addJob(job3, adminClient_.get()); ASSERT_EQ(rc3, nebula::cpp2::ErrorCode::SUCCEEDED); } - JobDescription job4(18, cpp2::AdminCmd::COMPACT, paras); + JobDescription job4(18, cpp2::JobType::COMPACT, paras); JobID jId4 = 0; - jobExist = jobMgr->checkJobExist(job4.getCmd(), job4.getParas(), jId4); + jobExist = jobMgr->checkJobExist(job4.getJobType(), job4.getParas(), jId4); if (!jobExist) { auto rc4 = jobMgr->addJob(job4, adminClient_.get()); ASSERT_NE(rc4, nebula::cpp2::ErrorCode::SUCCEEDED); @@ -217,13 +217,13 @@ TEST_F(JobManagerTest, JobDeduplication) { TEST_F(JobManagerTest, LoadJobDescription) { std::unique_ptr> jobMgr = getJobManager(); std::vector paras{"test_space"}; - JobDescription job1(1, cpp2::AdminCmd::COMPACT, paras); + JobDescription job1(1, cpp2::JobType::COMPACT, paras); job1.setStatus(cpp2::JobStatus ::RUNNING); job1.setStatus(cpp2::JobStatus::FINISHED); auto rc = jobMgr->addJob(job1, adminClient_.get()); ASSERT_EQ(rc, nebula::cpp2::ErrorCode::SUCCEEDED); ASSERT_EQ(job1.id_, 1); - ASSERT_EQ(job1.cmd_, cpp2::AdminCmd::COMPACT); + ASSERT_EQ(job1.type_, cpp2::JobType::COMPACT); ASSERT_EQ(job1.paras_[0], "test_space"); auto optJd2Ret = JobDescription::loadJobDescription(job1.id_, kv_.get()); @@ -231,7 +231,7 @@ TEST_F(JobManagerTest, LoadJobDescription) { auto optJd2 = nebula::value(optJd2Ret); ASSERT_EQ(job1.id_, optJd2.id_); LOG(INFO) << "job1.id_ = " << job1.id_; - ASSERT_EQ(job1.cmd_, optJd2.cmd_); + ASSERT_EQ(job1.type_, optJd2.type_); ASSERT_EQ(job1.paras_, optJd2.paras_); ASSERT_EQ(job1.status_, optJd2.status_); ASSERT_EQ(job1.startTime_, optJd2.startTime_); @@ -245,13 +245,13 @@ TEST(JobUtilTest, Dummy) { TEST_F(JobManagerTest, ShowJobs) { std::unique_ptr> jobMgr = getJobManager(); std::vector paras1{"test_space"}; - JobDescription jd1(1, cpp2::AdminCmd::COMPACT, paras1); + JobDescription jd1(1, cpp2::JobType::COMPACT, paras1); jd1.setStatus(cpp2::JobStatus::RUNNING); jd1.setStatus(cpp2::JobStatus::FINISHED); jobMgr->addJob(jd1, adminClient_.get()); std::vector paras2{"test_space"}; - JobDescription jd2(2, cpp2::AdminCmd::FLUSH, paras2); + JobDescription jd2(2, cpp2::JobType::FLUSH, paras2); jd2.setStatus(cpp2::JobStatus::RUNNING); jd2.setStatus(cpp2::JobStatus::FAILED); jobMgr->addJob(jd2, adminClient_.get()); @@ -262,14 +262,14 @@ TEST_F(JobManagerTest, ShowJobs) { auto& jobs = nebula::value(statusOrShowResult); ASSERT_EQ(jobs[1].get_id(), jd1.id_); - ASSERT_EQ(jobs[1].get_cmd(), cpp2::AdminCmd::COMPACT); + ASSERT_EQ(jobs[1].get_type(), cpp2::JobType::COMPACT); ASSERT_EQ(jobs[1].get_paras()[0], "test_space"); ASSERT_EQ(jobs[1].get_status(), cpp2::JobStatus::FINISHED); ASSERT_EQ(jobs[1].get_start_time(), jd1.startTime_); ASSERT_EQ(jobs[1].get_stop_time(), jd1.stopTime_); ASSERT_EQ(jobs[0].get_id(), jd2.id_); - ASSERT_EQ(jobs[0].get_cmd(), cpp2::AdminCmd::FLUSH); + ASSERT_EQ(jobs[0].get_type(), cpp2::JobType::FLUSH); ASSERT_EQ(jobs[0].get_paras()[0], "test_space"); ASSERT_EQ(jobs[0].get_status(), cpp2::JobStatus::FAILED); ASSERT_EQ(jobs[0].get_start_time(), jd2.startTime_); @@ -279,13 +279,13 @@ TEST_F(JobManagerTest, ShowJobs) { TEST_F(JobManagerTest, ShowJobsFromMultiSpace) { std::unique_ptr> jobMgr = getJobManager(); std::vector paras1{"test_space"}; - JobDescription jd1(1, cpp2::AdminCmd::COMPACT, paras1); + JobDescription jd1(1, cpp2::JobType::COMPACT, paras1); jd1.setStatus(cpp2::JobStatus::RUNNING); jd1.setStatus(cpp2::JobStatus::FINISHED); jobMgr->addJob(jd1, adminClient_.get()); std::vector paras2{"test_space2"}; - JobDescription jd2(2, cpp2::AdminCmd::FLUSH, paras2); + JobDescription jd2(2, cpp2::JobType::FLUSH, paras2); jd2.setStatus(cpp2::JobStatus::RUNNING); jd2.setStatus(cpp2::JobStatus::FAILED); jobMgr->addJob(jd2, adminClient_.get()); @@ -298,7 +298,7 @@ TEST_F(JobManagerTest, ShowJobsFromMultiSpace) { ASSERT_EQ(jobs.size(), 1); ASSERT_EQ(jobs[0].get_id(), jd2.id_); - ASSERT_EQ(jobs[0].get_cmd(), cpp2::AdminCmd::FLUSH); + ASSERT_EQ(jobs[0].get_type(), cpp2::JobType::FLUSH); ASSERT_EQ(jobs[0].get_paras()[0], "test_space2"); ASSERT_EQ(jobs[0].get_status(), cpp2::JobStatus::FAILED); ASSERT_EQ(jobs[0].get_start_time(), jd2.startTime_); @@ -313,7 +313,7 @@ TEST_F(JobManagerTest, ShowJob) { std::unique_ptr> jobMgr = getJobManager(); std::vector paras{"test_space"}; - JobDescription jd(1, cpp2::AdminCmd::COMPACT, paras); + JobDescription jd(1, cpp2::JobType::COMPACT, paras); jd.setStatus(cpp2::JobStatus::RUNNING); jd.setStatus(cpp2::JobStatus::FINISHED); jobMgr->addJob(jd, adminClient_.get()); @@ -342,7 +342,7 @@ TEST_F(JobManagerTest, ShowJob) { auto& tasks = nebula::value(showResult).second; ASSERT_EQ(jobs.get_id(), iJob); - ASSERT_EQ(jobs.get_cmd(), cpp2::AdminCmd::COMPACT); + ASSERT_EQ(jobs.get_type(), cpp2::JobType::COMPACT); ASSERT_EQ(jobs.get_paras()[0], "test_space"); ASSERT_EQ(jobs.get_status(), cpp2::JobStatus::FINISHED); ASSERT_EQ(jobs.get_start_time(), jd.startTime_); @@ -367,7 +367,7 @@ TEST_F(JobManagerTest, ShowJobInOtherSpace) { std::unique_ptr> jobMgr = getJobManager(); std::vector paras{"test_space"}; - JobDescription jd(1, cpp2::AdminCmd::COMPACT, paras); + JobDescription jd(1, cpp2::JobType::COMPACT, paras); jd.setStatus(cpp2::JobStatus::RUNNING); jd.setStatus(cpp2::JobStatus::FINISHED); jobMgr->addJob(jd, adminClient_.get()); @@ -403,7 +403,7 @@ TEST_F(JobManagerTest, RecoverJob) { auto spaceName = "test_space"; int32_t nJob = 3; for (auto i = 0; i != nJob; ++i) { - JobDescription jd(i, cpp2::AdminCmd::FLUSH, {spaceName}); + JobDescription jd(i, cpp2::JobType::FLUSH, {spaceName}); jobMgr->save(jd.jobKey(), jd.jobVal()); } @@ -413,7 +413,7 @@ TEST_F(JobManagerTest, RecoverJob) { TEST(JobDescriptionTest, Ctor) { std::vector paras1{"test_space"}; - JobDescription jd1(1, cpp2::AdminCmd::COMPACT, paras1); + JobDescription jd1(1, cpp2::JobType::COMPACT, paras1); jd1.setStatus(cpp2::JobStatus::RUNNING); jd1.setStatus(cpp2::JobStatus::FINISHED); LOG(INFO) << "jd1 ctored"; @@ -421,7 +421,7 @@ TEST(JobDescriptionTest, Ctor) { TEST(JobDescriptionTest, Ctor2) { std::vector paras1{"test_space"}; - JobDescription jd1(1, cpp2::AdminCmd::COMPACT, paras1); + JobDescription jd1(1, cpp2::JobType::COMPACT, paras1); jd1.setStatus(cpp2::JobStatus::RUNNING); jd1.setStatus(cpp2::JobStatus::FINISHED); LOG(INFO) << "jd1 ctored"; @@ -434,7 +434,7 @@ TEST(JobDescriptionTest, Ctor2) { TEST(JobDescriptionTest, Ctor3) { std::vector paras1{"test_space"}; - JobDescription jd1(1, cpp2::AdminCmd::COMPACT, paras1); + JobDescription jd1(1, cpp2::JobType::COMPACT, paras1); jd1.setStatus(cpp2::JobStatus::RUNNING); jd1.setStatus(cpp2::JobStatus::FINISHED); LOG(INFO) << "jd1 ctored"; @@ -450,10 +450,10 @@ TEST(JobDescriptionTest, Ctor3) { TEST(JobDescriptionTest, ParseKey) { int32_t iJob = std::pow(2, 16); std::vector paras{"test_space"}; - JobDescription jd(iJob, cpp2::AdminCmd::COMPACT, paras); + JobDescription jd(iJob, cpp2::JobType::COMPACT, paras); auto sKey = jd.jobKey(); ASSERT_EQ(iJob, jd.getJobId()); - ASSERT_EQ(cpp2::AdminCmd::COMPACT, jd.getCmd()); + ASSERT_EQ(cpp2::JobType::COMPACT, jd.getJobType()); folly::StringPiece spKey(&sKey[0], sKey.length()); auto parsedKeyId = JobDescription::parseKey(spKey); @@ -463,7 +463,7 @@ TEST(JobDescriptionTest, ParseKey) { TEST(JobDescriptionTest, ParseVal) { int32_t iJob = std::pow(2, 15); std::vector paras{"nba"}; - JobDescription jd(iJob, cpp2::AdminCmd::FLUSH, paras); + JobDescription jd(iJob, cpp2::JobType::FLUSH, paras); auto status = cpp2::JobStatus::FINISHED; jd.setStatus(cpp2::JobStatus::RUNNING); jd.setStatus(status); @@ -473,7 +473,7 @@ TEST(JobDescriptionTest, ParseVal) { auto strVal = jd.jobVal(); folly::StringPiece rawVal(&strVal[0], strVal.length()); auto parsedVal = JobDescription::parseVal(rawVal); - ASSERT_EQ(cpp2::AdminCmd::FLUSH, std::get<0>(parsedVal)); + ASSERT_EQ(cpp2::JobType::FLUSH, std::get<0>(parsedVal)); ASSERT_EQ(paras, std::get<1>(parsedVal)); ASSERT_EQ(status, std::get<2>(parsedVal)); ASSERT_EQ(startTime, std::get<3>(parsedVal)); diff --git a/src/meta/test/MockAdminClient.h b/src/meta/test/MockAdminClient.h index e7008393165..5e5f0a60c5a 100644 --- a/src/meta/test/MockAdminClient.h +++ b/src/meta/test/MockAdminClient.h @@ -41,7 +41,7 @@ class MockAdminClient : public AdminClient { storage::cpp2::EngineSignType, const HostAddr&)); MOCK_METHOD7(addTask, - folly::Future>(cpp2::AdminCmd, + folly::Future>(cpp2::JobType, int32_t, int32_t, GraphSpaceID, diff --git a/src/parser/AdminSentences.cpp b/src/parser/AdminSentences.cpp index b2318bd0fb4..c9a1e2ff13c 100644 --- a/src/parser/AdminSentences.cpp +++ b/src/parser/AdminSentences.cpp @@ -243,26 +243,26 @@ std::string ShowListenerSentence::toString() const { std::string AdminJobSentence::toString() const { switch (op_) { - case meta::cpp2::AdminJobOp::ADD: { - switch (cmd_) { - case meta::cpp2::AdminCmd::COMPACT: + case meta::cpp2::JobOp::ADD: { + switch (type_) { + case meta::cpp2::JobType::COMPACT: return "SUBMIT JOB COMPACT"; - case meta::cpp2::AdminCmd::FLUSH: + case meta::cpp2::JobType::FLUSH: return "SUBMIT JOB FLUSH"; - case meta::cpp2::AdminCmd::REBUILD_TAG_INDEX: + case meta::cpp2::JobType::REBUILD_TAG_INDEX: return folly::stringPrintf("REBUILD TAG INDEX %s", folly::join(",", paras_).c_str()); - case meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX: + case meta::cpp2::JobType::REBUILD_EDGE_INDEX: return folly::stringPrintf("REBUILD EDGE INDEX %s", folly::join(",", paras_).c_str()); - case meta::cpp2::AdminCmd::REBUILD_FULLTEXT_INDEX: + case meta::cpp2::JobType::REBUILD_FULLTEXT_INDEX: return "REBUILD FULLTEXT INDEX"; - case meta::cpp2::AdminCmd::STATS: + case meta::cpp2::JobType::STATS: return "SUBMIT JOB STATS"; - case meta::cpp2::AdminCmd::DOWNLOAD: + case meta::cpp2::JobType::DOWNLOAD: return paras_.empty() ? "DOWNLOAD HDFS " : folly::stringPrintf("DOWNLOAD HDFS %s", paras_[0].c_str()); - case meta::cpp2::AdminCmd::INGEST: + case meta::cpp2::JobType::INGEST: return "INGEST"; - case meta::cpp2::AdminCmd::DATA_BALANCE: + case meta::cpp2::JobType::DATA_BALANCE: if (paras_.empty()) { return "SUBMIT JOB BALANCE IN ZONE"; } else { @@ -273,7 +273,7 @@ std::string AdminJobSentence::toString() const { } return str; } - case meta::cpp2::AdminCmd::ZONE_BALANCE: + case meta::cpp2::JobType::ZONE_BALANCE: if (paras_.empty()) { return "SUBMIT JOB BALANCE ACROSS ZONE"; } else { @@ -284,22 +284,22 @@ std::string AdminJobSentence::toString() const { } return str; } - case meta::cpp2::AdminCmd::LEADER_BALANCE: + case meta::cpp2::JobType::LEADER_BALANCE: return "SUBMIT JOB BALANCE LEADER"; - case meta::cpp2::AdminCmd::UNKNOWN: - return folly::stringPrintf("Unsupported AdminCmd: %s", - apache::thrift::util::enumNameSafe(cmd_).c_str()); + case meta::cpp2::JobType::UNKNOWN: + return folly::stringPrintf("Unsupported JobType: %s", + apache::thrift::util::enumNameSafe(type_).c_str()); } } - case meta::cpp2::AdminJobOp::SHOW_All: + case meta::cpp2::JobOp::SHOW_All: return "SHOW JOBS"; - case meta::cpp2::AdminJobOp::SHOW: + case meta::cpp2::JobOp::SHOW: CHECK_EQ(paras_.size(), 1U); return folly::stringPrintf("SHOW JOB %s", paras_[0].c_str()); - case meta::cpp2::AdminJobOp::STOP: + case meta::cpp2::JobOp::STOP: CHECK_EQ(paras_.size(), 1U); return folly::stringPrintf("STOP JOB %s", paras_[0].c_str()); - case meta::cpp2::AdminJobOp::RECOVER: + case meta::cpp2::JobOp::RECOVER: if (paras_.empty()) { return "RECOVER JOB"; } else { @@ -313,12 +313,12 @@ std::string AdminJobSentence::toString() const { LOG(FATAL) << "Unknown job operation " << static_cast(op_); } -meta::cpp2::AdminJobOp AdminJobSentence::getOp() const { +meta::cpp2::JobOp AdminJobSentence::getOp() const { return op_; } -meta::cpp2::AdminCmd AdminJobSentence::getCmd() const { - return cmd_; +meta::cpp2::JobType AdminJobSentence::getJobType() const { + return type_; } const std::vector &AdminJobSentence::getParas() const { diff --git a/src/parser/AdminSentences.h b/src/parser/AdminSentences.h index 298d8cfb365..2ece4c54405 100644 --- a/src/parser/AdminSentences.h +++ b/src/parser/AdminSentences.h @@ -659,10 +659,10 @@ class ShowListenerSentence final : public Sentence { class AdminJobSentence final : public Sentence { public: - explicit AdminJobSentence(meta::cpp2::AdminJobOp op, - meta::cpp2::AdminCmd cmd = meta::cpp2::AdminCmd::UNKNOWN) - : op_(op), cmd_(cmd) { - if (op == meta::cpp2::AdminJobOp::SHOW || op == meta::cpp2::AdminJobOp::SHOW_All) { + explicit AdminJobSentence(meta::cpp2::JobOp op, + meta::cpp2::JobType type = meta::cpp2::JobType::UNKNOWN) + : op_(op), type_(type) { + if (op == meta::cpp2::JobOp::SHOW || op == meta::cpp2::JobOp::SHOW_All) { kind_ = Kind::kAdminShowJobs; } else { kind_ = Kind::kAdminJob; @@ -672,13 +672,13 @@ class AdminJobSentence final : public Sentence { void addPara(const std::string& para); void addPara(const NameLabelList& NameLabelList); std::string toString() const override; - meta::cpp2::AdminJobOp getOp() const; - meta::cpp2::AdminCmd getCmd() const; + meta::cpp2::JobOp getOp() const; + meta::cpp2::JobType getJobType() const; const std::vector& getParas() const; private: - meta::cpp2::AdminJobOp op_; - meta::cpp2::AdminCmd cmd_; + meta::cpp2::JobOp op_; + meta::cpp2::JobType type_; std::vector paras_; }; diff --git a/src/parser/parser.yy b/src/parser/parser.yy index ceb55afe744..6aaa82523f6 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -2776,36 +2776,36 @@ describe_edge_index_sentence rebuild_tag_index_sentence : KW_REBUILD KW_TAG KW_INDEX name_label_list { - auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, - meta::cpp2::AdminCmd::REBUILD_TAG_INDEX); + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::REBUILD_TAG_INDEX); sentence->addPara(*$4); delete $4; $$ = sentence; } | KW_REBUILD KW_TAG KW_INDEX { - $$ = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, - meta::cpp2::AdminCmd::REBUILD_TAG_INDEX); + $$ = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::REBUILD_TAG_INDEX); } ; rebuild_edge_index_sentence : KW_REBUILD KW_EDGE KW_INDEX name_label_list { - auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, - meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX); + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::REBUILD_EDGE_INDEX); sentence->addPara(*$4); delete $4; $$ = sentence; } | KW_REBUILD KW_EDGE KW_INDEX { - $$ = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, - meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX); + $$ = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::REBUILD_EDGE_INDEX); } ; rebuild_fulltext_index_sentence : KW_REBUILD KW_FULLTEXT KW_INDEX { - $$ = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, - meta::cpp2::AdminCmd::REBUILD_FULLTEXT_INDEX); + $$ = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::REBUILD_FULLTEXT_INDEX); } ; @@ -3235,40 +3235,40 @@ ingest_sentence admin_job_sentence : KW_SUBMIT KW_JOB KW_COMPACT { - auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, - meta::cpp2::AdminCmd::COMPACT); + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::COMPACT); $$ = sentence; } | KW_SUBMIT KW_JOB KW_FLUSH { - auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, - meta::cpp2::AdminCmd::FLUSH); + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::FLUSH); $$ = sentence; } | KW_SUBMIT KW_JOB KW_STATS { - auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, - meta::cpp2::AdminCmd::STATS); + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::STATS); $$ = sentence; } | KW_SHOW KW_JOBS { - auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::SHOW_All); + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::SHOW_All); $$ = sentence; } | KW_SHOW KW_JOB legal_integer { - auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::SHOW); + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::SHOW); sentence->addPara(std::to_string($3)); $$ = sentence; } | KW_STOP KW_JOB legal_integer { - auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::STOP); + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::STOP); sentence->addPara(std::to_string($3)); $$ = sentence; } | KW_RECOVER KW_JOB { - auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::RECOVER); + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::RECOVER); $$ = sentence; } | KW_RECOVER KW_JOB integer_list { - auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::RECOVER); + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::RECOVER); std::vector*intVec=$3; for(int32_t i = 0; isize(); i++){ sentence->addPara(std::to_string(intVec->at(i))); @@ -3277,18 +3277,18 @@ admin_job_sentence $$ = sentence; } | KW_SUBMIT KW_JOB KW_BALANCE KW_LEADER { - auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, - meta::cpp2::AdminCmd::LEADER_BALANCE); + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::LEADER_BALANCE); $$ = sentence; } | KW_SUBMIT KW_JOB KW_BALANCE KW_IN KW_ZONE { - auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, - meta::cpp2::AdminCmd::DATA_BALANCE); + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::DATA_BALANCE); $$ = sentence; } | KW_SUBMIT KW_JOB KW_BALANCE KW_IN KW_ZONE KW_REMOVE host_list { - auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, - meta::cpp2::AdminCmd::DATA_BALANCE); + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::DATA_BALANCE); HostList* hl = $7; std::vector has = hl->hosts(); for (HostAddr& ha: has) { @@ -3298,13 +3298,13 @@ admin_job_sentence $$ = sentence; } | KW_SUBMIT KW_JOB KW_BALANCE KW_ACROSS KW_ZONE { - auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, - meta::cpp2::AdminCmd::ZONE_BALANCE); + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::ZONE_BALANCE); $$ = sentence; } | KW_SUBMIT KW_JOB KW_BALANCE KW_ACROSS KW_ZONE KW_REMOVE zone_name_list { - auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, - meta::cpp2::AdminCmd::ZONE_BALANCE); + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::ZONE_BALANCE); ZoneNameList* nl = $7; std::vector names = nl->zoneNames(); for (std::string& name: names) { @@ -3745,19 +3745,19 @@ integer_list balance_sentence : KW_BALANCE KW_LEADER { - auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, - meta::cpp2::AdminCmd::LEADER_BALANCE); + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::LEADER_BALANCE); $$ = sentence; } | KW_BALANCE KW_IN KW_ZONE { - auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, - meta::cpp2::AdminCmd::DATA_BALANCE); + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::DATA_BALANCE); $$ = sentence; } | KW_BALANCE KW_IN KW_ZONE KW_REMOVE host_list { - auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, - meta::cpp2::AdminCmd::DATA_BALANCE); + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::DATA_BALANCE); HostList* hl = $5; std::vector has = hl->hosts(); for (HostAddr& ha: has) { @@ -3767,13 +3767,13 @@ balance_sentence $$ = sentence; } | KW_BALANCE KW_ACROSS KW_ZONE { - auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, - meta::cpp2::AdminCmd::ZONE_BALANCE); + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::ZONE_BALANCE); $$ = sentence; } | KW_BALANCE KW_ACROSS KW_ZONE KW_REMOVE zone_name_list { - auto sentence = new AdminJobSentence(meta::cpp2::AdminJobOp::ADD, - meta::cpp2::AdminCmd::ZONE_BALANCE); + auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD, + meta::cpp2::JobType::ZONE_BALANCE); ZoneNameList* nl = $5; std::vector names = nl->zoneNames(); for (std::string& name: names) { diff --git a/src/storage/admin/AdminTask.cpp b/src/storage/admin/AdminTask.cpp index 154c1795d10..90608c18f26 100644 --- a/src/storage/admin/AdminTask.cpp +++ b/src/storage/admin/AdminTask.cpp @@ -18,23 +18,23 @@ namespace storage { std::shared_ptr AdminTaskFactory::createAdminTask(StorageEnv* env, TaskContext&& ctx) { FLOG_INFO("%s (%d, %d)", __func__, ctx.jobId_, ctx.taskId_); std::shared_ptr ret; - switch (ctx.cmd_) { - case meta::cpp2::AdminCmd::COMPACT: + switch (ctx.jobType_) { + case meta::cpp2::JobType::COMPACT: ret = std::make_shared(env, std::move(ctx)); break; - case meta::cpp2::AdminCmd::FLUSH: + case meta::cpp2::JobType::FLUSH: ret = std::make_shared(env, std::move(ctx)); break; - case meta::cpp2::AdminCmd::REBUILD_TAG_INDEX: + case meta::cpp2::JobType::REBUILD_TAG_INDEX: ret = std::make_shared(env, std::move(ctx)); break; - case meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX: + case meta::cpp2::JobType::REBUILD_EDGE_INDEX: ret = std::make_shared(env, std::move(ctx)); break; - case meta::cpp2::AdminCmd::REBUILD_FULLTEXT_INDEX: + case meta::cpp2::JobType::REBUILD_FULLTEXT_INDEX: ret = std::make_shared(env, std::move(ctx)); break; - case meta::cpp2::AdminCmd::STATS: + case meta::cpp2::JobType::STATS: ret = std::make_shared(env, std::move(ctx)); break; default: diff --git a/src/storage/admin/AdminTask.h b/src/storage/admin/AdminTask.h index e8437d92006..8e2ebbcb46f 100644 --- a/src/storage/admin/AdminTask.h +++ b/src/storage/admin/AdminTask.h @@ -53,13 +53,13 @@ struct TaskContext { TaskContext() = default; TaskContext(const cpp2::AddTaskRequest& req, CallBack cb) - : cmd_(req.get_cmd()), + : jobType_(req.get_job_type()), jobId_(req.get_job_id()), taskId_(req.get_task_id()), parameters_(req.get_para()), onFinish_(cb) {} - nebula::meta::cpp2::AdminCmd cmd_; + nebula::meta::cpp2::JobType jobType_; JobID jobId_{-1}; TaskID taskId_{-1}; nebula::storage::cpp2::TaskPara parameters_; @@ -210,10 +210,10 @@ class AdminTask { /** * @brief Get admin job's command type. * - * @return meta::cpp2::AdminCmd job's command type. + * @return meta::cpp2::JobType job's command type. */ - meta::cpp2::AdminCmd cmdType() { - return ctx_.cmd_; + meta::cpp2::JobType jobType() { + return ctx_.jobType_; } public: diff --git a/src/storage/test/IndexWithTTLTest.cpp b/src/storage/test/IndexWithTTLTest.cpp index b82eb4c7fa8..6c3de4015a0 100644 --- a/src/storage/test/IndexWithTTLTest.cpp +++ b/src/storage/test/IndexWithTTLTest.cpp @@ -431,7 +431,7 @@ TEST(IndexWithTTLTest, RebuildTagIndexWithTTL) { parameter.task_specific_paras_ref() = {"2021002"}; cpp2::AddTaskRequest request; - request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_TAG_INDEX; + request.job_type_ref() = meta::cpp2::JobType::REBUILD_TAG_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 13; request.para_ref() = std::move(parameter); @@ -500,7 +500,7 @@ TEST(IndexWithTTLTest, RebuildEdgeIndexWithTTL) { parameter.task_specific_paras_ref() = {"2021002"}; cpp2::AddTaskRequest request; - request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX; + request.job_type_ref() = meta::cpp2::JobType::REBUILD_EDGE_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 13; request.para_ref() = std::move(parameter); @@ -571,7 +571,7 @@ TEST(IndexWithTTLTest, RebuildTagIndexWithTTLExpired) { parameter.task_specific_paras_ref() = {"2021002"}; cpp2::AddTaskRequest request; - request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_TAG_INDEX; + request.job_type_ref() = meta::cpp2::JobType::REBUILD_TAG_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 13; request.para_ref() = std::move(parameter); @@ -642,7 +642,7 @@ TEST(IndexWithTTLTest, RebuildEdgeIndexWithTTLExpired) { parameter.task_specific_paras_ref() = {"2021002"}; cpp2::AddTaskRequest request; - request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX; + request.job_type_ref() = meta::cpp2::JobType::REBUILD_EDGE_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 15; request.para_ref() = std::move(parameter); diff --git a/src/storage/test/RebuildIndexTest.cpp b/src/storage/test/RebuildIndexTest.cpp index 5b2c456b7d0..e2b9737545e 100644 --- a/src/storage/test/RebuildIndexTest.cpp +++ b/src/storage/test/RebuildIndexTest.cpp @@ -78,7 +78,7 @@ TEST_F(RebuildIndexTest, RebuildTagIndexCheckALLData) { parameter.task_specific_paras_ref() = {"4", "5"}; cpp2::AddTaskRequest request; - request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_TAG_INDEX; + request.job_type_ref() = meta::cpp2::JobType::REBUILD_TAG_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 13; request.para_ref() = std::move(parameter); @@ -165,7 +165,7 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndexCheckALLData) { parameter.task_specific_paras_ref() = {"103", "104"}; cpp2::AddTaskRequest request; - request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX; + request.job_type_ref() = meta::cpp2::JobType::REBUILD_EDGE_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 16; request.para_ref() = std::move(parameter); @@ -262,7 +262,7 @@ TEST_F(RebuildIndexTest, RebuildTagIndexWithDelete) { parameter.task_specific_paras_ref() = {"4", "5"}; cpp2::AddTaskRequest request; - request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_TAG_INDEX; + request.job_type_ref() = meta::cpp2::JobType::REBUILD_TAG_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 11; request.para_ref() = std::move(parameter); @@ -323,7 +323,7 @@ TEST_F(RebuildIndexTest, RebuildTagIndexWithAppend) { parameter.task_specific_paras_ref() = {"4", "5"}; cpp2::AddTaskRequest request; - request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_TAG_INDEX; + request.job_type_ref() = meta::cpp2::JobType::REBUILD_TAG_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 12; request.para_ref() = std::move(parameter); @@ -367,7 +367,7 @@ TEST_F(RebuildIndexTest, RebuildTagIndex) { parameter.parts_ref() = std::move(parts); cpp2::AddTaskRequest request; - request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_TAG_INDEX; + request.job_type_ref() = meta::cpp2::JobType::REBUILD_TAG_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 13; parameter.task_specific_paras_ref() = {"4", "5"}; @@ -423,7 +423,7 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndexWithDelete) { parameter.task_specific_paras_ref() = {"103", "104"}; cpp2::AddTaskRequest request; - request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX; + request.job_type_ref() = meta::cpp2::JobType::REBUILD_EDGE_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 14; request.para_ref() = std::move(parameter); @@ -485,7 +485,7 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndexWithAppend) { parameter.task_specific_paras_ref() = {"103", "104"}; cpp2::AddTaskRequest request; - request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX; + request.job_type_ref() = meta::cpp2::JobType::REBUILD_EDGE_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 15; request.para_ref() = std::move(parameter); @@ -529,7 +529,7 @@ TEST_F(RebuildIndexTest, RebuildEdgeIndex) { parameter.task_specific_paras_ref() = {"103", "104"}; cpp2::AddTaskRequest request; - request.cmd_ref() = meta::cpp2::AdminCmd::REBUILD_EDGE_INDEX; + request.job_type_ref() = meta::cpp2::JobType::REBUILD_EDGE_INDEX; request.job_id_ref() = ++gJobId; request.task_id_ref() = 16; request.para_ref() = std::move(parameter); diff --git a/src/storage/test/StatsTaskTest.cpp b/src/storage/test/StatsTaskTest.cpp index 2c98e30825b..803a04b4d08 100644 --- a/src/storage/test/StatsTaskTest.cpp +++ b/src/storage/test/StatsTaskTest.cpp @@ -69,7 +69,7 @@ TEST_F(StatsTaskTest, StatsTagAndEdgeData) { parameter.parts_ref() = parts; cpp2::AddTaskRequest request; - request.cmd_ref() = meta::cpp2::AdminCmd::STATS; + request.job_type_ref() = meta::cpp2::JobType::STATS; request.job_id_ref() = ++gJobId; request.task_id_ref() = 13; request.para_ref() = std::move(parameter); @@ -134,7 +134,7 @@ TEST_F(StatsTaskTest, StatsTagAndEdgeData) { parameter.parts_ref() = parts; cpp2::AddTaskRequest request; - request.cmd_ref() = meta::cpp2::AdminCmd::STATS; + request.job_type_ref() = meta::cpp2::JobType::STATS; request.job_id_ref() = ++gJobId; request.task_id_ref() = 14; request.para_ref() = std::move(parameter); @@ -205,7 +205,7 @@ TEST_F(StatsTaskTest, StatsTagAndEdgeData) { parameter.parts_ref() = parts; cpp2::AddTaskRequest request; - request.cmd_ref() = meta::cpp2::AdminCmd::STATS; + request.job_type_ref() = meta::cpp2::JobType::STATS; request.job_id_ref() = ++gJobId; request.task_id_ref() = 15; request.para_ref() = std::move(parameter);