From e15c7451c7f726f6a1e89c11077cf44d50590b9d Mon Sep 17 00:00:00 2001 From: pengweisong <90180021+pengweisong@users.noreply.github.com> Date: Wed, 16 Nov 2022 17:05:17 +0800 Subject: [PATCH 1/2] remove job entries when drop space && check space exist when check job running --- src/meta/processors/job/JobManager.cpp | 16 ++++++++++++++++ src/meta/processors/job/JobManager.h | 15 +++++++++++++++ src/meta/processors/job/ReportTaskProcessor.cpp | 2 ++ .../processors/parts/DropSpaceProcessor.cpp | 17 +++++++++++++++++ 4 files changed, 50 insertions(+) diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index 12ff42bb852..2bace32dc69 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -972,6 +972,7 @@ ErrorOr JobManager::checkTypeJobRunning( return retCode; } + std::unordered_map spaceExistCache; for (; iter->valid(); iter->next()) { auto jobKey = iter->key(); if (MetaKeyUtils::isJobKey(jobKey)) { @@ -985,6 +986,21 @@ ErrorOr JobManager::checkTypeJobRunning( continue; } + // In some corner cases, there maybe job entry exist but the space has been dropped + auto spaceId = jobDesc.getSpace(); + if (spaceExistCache.find(spaceId) != spaceExistCache.end()) { + if (!spaceExistCache[spaceId]) { + continue; + } + } else { + if (!spaceExist(spaceId)) { + spaceExistCache[spaceId] = false; + continue; + } else { + spaceExistCache[spaceId] = true; + } + } + auto status = jobDesc.getStatus(); if (status == cpp2::JobStatus::QUEUE || status == cpp2::JobStatus::RUNNING) { return true; diff --git a/src/meta/processors/job/JobManager.h b/src/meta/processors/job/JobManager.h index 28550647006..7f65e2f428d 100644 --- a/src/meta/processors/job/JobManager.h +++ b/src/meta/processors/job/JobManager.h @@ -300,6 +300,21 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable { */ void resetSpaceRunning(GraphSpaceID spaceId); + /** + * @brief check if the space exist + * + * @return + */ + bool spaceExist(GraphSpaceID spaceId) { + auto spaceKey = MetaKeyUtils::spaceKey(spaceId); + std::string val; + auto retCode = kvStore_->get(kDefaultSpaceId, kDefaultPartId, spaceKey, &val); + if (retCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { + return false; + } + return true; + } + private: using PriorityQueue = folly::PriorityUMPSCQueueSet, true>; // Each PriorityQueue contains high and low priority queues. diff --git a/src/meta/processors/job/ReportTaskProcessor.cpp b/src/meta/processors/job/ReportTaskProcessor.cpp index 56bfe372734..25c523f8f22 100644 --- a/src/meta/processors/job/ReportTaskProcessor.cpp +++ b/src/meta/processors/job/ReportTaskProcessor.cpp @@ -13,6 +13,8 @@ namespace meta { #include "meta/processors/job/JobManager.h" void ReportTaskProcessor::process(const cpp2::ReportTaskReq& req) { + CHECK_SPACE_ID_AND_RETURN(req.get_space_id()); + JobManager* jobMgr = JobManager::getInstance(); auto rc = jobMgr->reportTaskFinish(req); if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) { diff --git a/src/meta/processors/parts/DropSpaceProcessor.cpp b/src/meta/processors/parts/DropSpaceProcessor.cpp index 3f758724d10..b2a44dec34a 100644 --- a/src/meta/processors/parts/DropSpaceProcessor.cpp +++ b/src/meta/processors/parts/DropSpaceProcessor.cpp @@ -129,6 +129,23 @@ void DropSpaceProcessor::process(const cpp2::DropSpaceReq& req) { auto localIdkey = MetaKeyUtils::localIdKey(spaceId); batchHolder->remove(std::move(localIdkey)); + // 8. Delete all job data + auto jobPrefix = MetaKeyUtils::jobPrefix(spaceId); + auto jobRet = doPrefix(jobPrefix); + if (!nebula::ok(jobRet)) { + auto retCode = nebula::error(jobRet); + LOG(INFO) << "Loading Job Failed" << apache::thrift::util::enumNameSafe(jobRet); + handleErrorCode(retCode); + onFinished(); + return; + } + + auto jobIter = nebula::value(jobRet).get(); + while (jobIter->valid()) { + batchHolder->remove(jobIter->key().str()); + jobIter->next(); + } + auto timeInMilliSec = time::WallClock::fastNowInMilliSec(); LastUpdateTimeMan::update(batchHolder.get(), timeInMilliSec); auto batch = encodeBatchValue(std::move(batchHolder)->getBatch()); From ec8c826ff8cd284422c43824ab9448116c40f079 Mon Sep 17 00:00:00 2001 From: pengweisong <90180021+pengweisong@users.noreply.github.com> Date: Wed, 16 Nov 2022 18:34:46 +0800 Subject: [PATCH 2/2] fix bug --- src/meta/processors/job/JobManager.cpp | 10 ++++++++++ src/meta/processors/job/JobManager.h | 10 +--------- src/meta/processors/parts/DropSpaceProcessor.cpp | 2 +- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index 2bace32dc69..4112f98494d 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -66,6 +66,16 @@ JobManager::~JobManager() { shutDown(); } +bool JobManager::spaceExist(GraphSpaceID spaceId) { + auto spaceKey = MetaKeyUtils::spaceKey(spaceId); + std::string val; + auto retCode = kvStore_->get(kDefaultSpaceId, kDefaultPartId, spaceKey, &val); + if (retCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { + return false; + } + return true; +} + nebula::cpp2::ErrorCode JobManager::handleRemainingJobs() { std::unique_ptr iter; auto retCode = diff --git a/src/meta/processors/job/JobManager.h b/src/meta/processors/job/JobManager.h index 7f65e2f428d..be85d9cecc9 100644 --- a/src/meta/processors/job/JobManager.h +++ b/src/meta/processors/job/JobManager.h @@ -305,15 +305,7 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable { * * @return */ - bool spaceExist(GraphSpaceID spaceId) { - auto spaceKey = MetaKeyUtils::spaceKey(spaceId); - std::string val; - auto retCode = kvStore_->get(kDefaultSpaceId, kDefaultPartId, spaceKey, &val); - if (retCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { - return false; - } - return true; - } + bool spaceExist(GraphSpaceID spaceId); private: using PriorityQueue = folly::PriorityUMPSCQueueSet, true>; diff --git a/src/meta/processors/parts/DropSpaceProcessor.cpp b/src/meta/processors/parts/DropSpaceProcessor.cpp index b2a44dec34a..f6f239f50a1 100644 --- a/src/meta/processors/parts/DropSpaceProcessor.cpp +++ b/src/meta/processors/parts/DropSpaceProcessor.cpp @@ -134,7 +134,7 @@ void DropSpaceProcessor::process(const cpp2::DropSpaceReq& req) { auto jobRet = doPrefix(jobPrefix); if (!nebula::ok(jobRet)) { auto retCode = nebula::error(jobRet); - LOG(INFO) << "Loading Job Failed" << apache::thrift::util::enumNameSafe(jobRet); + LOG(INFO) << "Loading Job Failed" << apache::thrift::util::enumNameSafe(retCode); handleErrorCode(retCode); onFinished(); return;