Skip to content

Commit

Permalink
remove job entries when drop space && check space exist when check jo… (
Browse files Browse the repository at this point in the history
#4888)

* remove job entries when drop space && check space exist when check job running

* fix bug

Co-authored-by: panda-sheep <59197347+panda-sheep@users.noreply.github.com>
Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 18, 2022
1 parent 6f240e3 commit ae7e2fe
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 0 deletions.
26 changes: 26 additions & 0 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ JobManager::~JobManager() {
shutDown();
}

bool JobManager::spaceExist(GraphSpaceID spaceId) {
auto spaceKey = MetaKeyUtils::spaceKey(spaceId);
std::string val;
auto retCode = kvStore_->get(kDefaultSpaceId, kDefaultPartId, spaceKey, &val);
if (retCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
return false;
}
return true;
}

nebula::cpp2::ErrorCode JobManager::handleRemainingJobs() {
std::unique_ptr<kvstore::KVIterator> iter;
auto retCode =
Expand Down Expand Up @@ -971,6 +981,7 @@ ErrorOr<nebula::cpp2::ErrorCode, bool> JobManager::checkTypeJobRunning(
return retCode;
}

std::unordered_map<GraphSpaceID, bool> spaceExistCache;
for (; iter->valid(); iter->next()) {
auto jobKey = iter->key();
if (MetaKeyUtils::isJobKey(jobKey)) {
Expand All @@ -984,6 +995,21 @@ ErrorOr<nebula::cpp2::ErrorCode, bool> JobManager::checkTypeJobRunning(
continue;
}

// In some corner cases, there maybe job entry exist but the space has been dropped
auto spaceId = jobDesc.getSpace();
if (spaceExistCache.find(spaceId) != spaceExistCache.end()) {
if (!spaceExistCache[spaceId]) {
continue;
}
} else {
if (!spaceExist(spaceId)) {
spaceExistCache[spaceId] = false;
continue;
} else {
spaceExistCache[spaceId] = true;
}
}

auto status = jobDesc.getStatus();
if (status == cpp2::JobStatus::QUEUE || status == cpp2::JobStatus::RUNNING) {
return true;
Expand Down
7 changes: 7 additions & 0 deletions src/meta/processors/job/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,13 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable {
*/
void resetSpaceRunning(GraphSpaceID spaceId);

/**
* @brief check if the space exist
*
* @return
*/
bool spaceExist(GraphSpaceID spaceId);

private:
using PriorityQueue = folly::PriorityUMPSCQueueSet<std::tuple<JbOp, JobID, GraphSpaceID>, true>;
// Each PriorityQueue contains high and low priority queues.
Expand Down
2 changes: 2 additions & 0 deletions src/meta/processors/job/ReportTaskProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ namespace meta {
#include "meta/processors/job/JobManager.h"

void ReportTaskProcessor::process(const cpp2::ReportTaskReq& req) {
CHECK_SPACE_ID_AND_RETURN(req.get_space_id());

JobManager* jobMgr = JobManager::getInstance();
auto rc = jobMgr->reportTaskFinish(req);
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand Down
17 changes: 17 additions & 0 deletions src/meta/processors/parts/DropSpaceProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,23 @@ void DropSpaceProcessor::process(const cpp2::DropSpaceReq& req) {
auto localIdkey = MetaKeyUtils::localIdKey(spaceId);
batchHolder->remove(std::move(localIdkey));

// 8. Delete all job data
auto jobPrefix = MetaKeyUtils::jobPrefix(spaceId);
auto jobRet = doPrefix(jobPrefix);
if (!nebula::ok(jobRet)) {
auto retCode = nebula::error(jobRet);
LOG(INFO) << "Loading Job Failed" << apache::thrift::util::enumNameSafe(retCode);
handleErrorCode(retCode);
onFinished();
return;
}

auto jobIter = nebula::value(jobRet).get();
while (jobIter->valid()) {
batchHolder->remove(jobIter->key().str());
jobIter->next();
}

auto timeInMilliSec = time::WallClock::fastNowInMilliSec();
LastUpdateTimeMan::update(batchHolder.get(), timeInMilliSec);
auto batch = encodeBatchValue(std::move(batchHolder)->getBatch());
Expand Down

0 comments on commit ae7e2fe

Please sign in to comment.