From ae7e2fe5c97d11bf6095d9e9d561208321baa273 Mon Sep 17 00:00:00 2001 From: "pengwei.song" <90180021+pengweisong@users.noreply.github.com> Date: Fri, 18 Nov 2022 09:02:56 +0800 Subject: [PATCH] =?UTF-8?q?remove=20job=20entries=20when=20drop=20space=20?= =?UTF-8?q?&&=20check=20space=20exist=20when=20check=20jo=E2=80=A6=20(#488?= =?UTF-8?q?8)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * remove job entries when drop space && check space exist when check job running * fix bug Co-authored-by: panda-sheep <59197347+panda-sheep@users.noreply.github.com> Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com> --- src/meta/processors/job/JobManager.cpp | 26 +++++++++++++++++++ src/meta/processors/job/JobManager.h | 7 +++++ .../processors/job/ReportTaskProcessor.cpp | 2 ++ .../processors/parts/DropSpaceProcessor.cpp | 17 ++++++++++++ 4 files changed, 52 insertions(+) diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index cddd7952f9e..ee7e0a3b54a 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -65,6 +65,16 @@ JobManager::~JobManager() { shutDown(); } +bool JobManager::spaceExist(GraphSpaceID spaceId) { + auto spaceKey = MetaKeyUtils::spaceKey(spaceId); + std::string val; + auto retCode = kvStore_->get(kDefaultSpaceId, kDefaultPartId, spaceKey, &val); + if (retCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { + return false; + } + return true; +} + nebula::cpp2::ErrorCode JobManager::handleRemainingJobs() { std::unique_ptr iter; auto retCode = @@ -971,6 +981,7 @@ ErrorOr JobManager::checkTypeJobRunning( return retCode; } + std::unordered_map spaceExistCache; for (; iter->valid(); iter->next()) { auto jobKey = iter->key(); if (MetaKeyUtils::isJobKey(jobKey)) { @@ -984,6 +995,21 @@ ErrorOr JobManager::checkTypeJobRunning( continue; } + // In some corner cases, there maybe job entry exist but the space has been dropped + auto spaceId = jobDesc.getSpace(); + if (spaceExistCache.find(spaceId) != spaceExistCache.end()) { + if (!spaceExistCache[spaceId]) { + continue; + } + } else { + if (!spaceExist(spaceId)) { + spaceExistCache[spaceId] = false; + continue; + } else { + spaceExistCache[spaceId] = true; + } + } + auto status = jobDesc.getStatus(); if (status == cpp2::JobStatus::QUEUE || status == cpp2::JobStatus::RUNNING) { return true; diff --git a/src/meta/processors/job/JobManager.h b/src/meta/processors/job/JobManager.h index 28550647006..be85d9cecc9 100644 --- a/src/meta/processors/job/JobManager.h +++ b/src/meta/processors/job/JobManager.h @@ -300,6 +300,13 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable { */ void resetSpaceRunning(GraphSpaceID spaceId); + /** + * @brief check if the space exist + * + * @return + */ + bool spaceExist(GraphSpaceID spaceId); + private: using PriorityQueue = folly::PriorityUMPSCQueueSet, true>; // Each PriorityQueue contains high and low priority queues. diff --git a/src/meta/processors/job/ReportTaskProcessor.cpp b/src/meta/processors/job/ReportTaskProcessor.cpp index 56bfe372734..25c523f8f22 100644 --- a/src/meta/processors/job/ReportTaskProcessor.cpp +++ b/src/meta/processors/job/ReportTaskProcessor.cpp @@ -13,6 +13,8 @@ namespace meta { #include "meta/processors/job/JobManager.h" void ReportTaskProcessor::process(const cpp2::ReportTaskReq& req) { + CHECK_SPACE_ID_AND_RETURN(req.get_space_id()); + JobManager* jobMgr = JobManager::getInstance(); auto rc = jobMgr->reportTaskFinish(req); if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) { diff --git a/src/meta/processors/parts/DropSpaceProcessor.cpp b/src/meta/processors/parts/DropSpaceProcessor.cpp index 3f758724d10..f6f239f50a1 100644 --- a/src/meta/processors/parts/DropSpaceProcessor.cpp +++ b/src/meta/processors/parts/DropSpaceProcessor.cpp @@ -129,6 +129,23 @@ void DropSpaceProcessor::process(const cpp2::DropSpaceReq& req) { auto localIdkey = MetaKeyUtils::localIdKey(spaceId); batchHolder->remove(std::move(localIdkey)); + // 8. Delete all job data + auto jobPrefix = MetaKeyUtils::jobPrefix(spaceId); + auto jobRet = doPrefix(jobPrefix); + if (!nebula::ok(jobRet)) { + auto retCode = nebula::error(jobRet); + LOG(INFO) << "Loading Job Failed" << apache::thrift::util::enumNameSafe(retCode); + handleErrorCode(retCode); + onFinished(); + return; + } + + auto jobIter = nebula::value(jobRet).get(); + while (jobIter->valid()) { + batchHolder->remove(jobIter->key().str()); + jobIter->next(); + } + auto timeInMilliSec = time::WallClock::fastNowInMilliSec(); LastUpdateTimeMan::update(batchHolder.get(), timeInMilliSec); auto batch = encodeBatchValue(std::move(batchHolder)->getBatch());