Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove job entries when drop space && check space exist when check jo… #4888

Merged
merged 6 commits into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ JobManager::~JobManager() {
shutDown();
}

bool JobManager::spaceExist(GraphSpaceID spaceId) {
auto spaceKey = MetaKeyUtils::spaceKey(spaceId);
std::string val;
auto retCode = kvStore_->get(kDefaultSpaceId, kDefaultPartId, spaceKey, &val);
if (retCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
return false;
}
return true;
}

nebula::cpp2::ErrorCode JobManager::handleRemainingJobs() {
std::unique_ptr<kvstore::KVIterator> iter;
auto retCode =
Expand Down Expand Up @@ -972,6 +982,7 @@ ErrorOr<nebula::cpp2::ErrorCode, bool> JobManager::checkTypeJobRunning(
return retCode;
}

std::unordered_map<GraphSpaceID, bool> spaceExistCache;
for (; iter->valid(); iter->next()) {
auto jobKey = iter->key();
if (MetaKeyUtils::isJobKey(jobKey)) {
Expand All @@ -985,6 +996,21 @@ ErrorOr<nebula::cpp2::ErrorCode, bool> JobManager::checkTypeJobRunning(
continue;
}

// In some corner cases, there maybe job entry exist but the space has been dropped
auto spaceId = jobDesc.getSpace();
if (spaceExistCache.find(spaceId) != spaceExistCache.end()) {
if (!spaceExistCache[spaceId]) {
continue;
}
} else {
if (!spaceExist(spaceId)) {
spaceExistCache[spaceId] = false;
continue;
} else {
spaceExistCache[spaceId] = true;
}
}

auto status = jobDesc.getStatus();
if (status == cpp2::JobStatus::QUEUE || status == cpp2::JobStatus::RUNNING) {
return true;
Expand Down
7 changes: 7 additions & 0 deletions src/meta/processors/job/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,13 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable {
*/
void resetSpaceRunning(GraphSpaceID spaceId);

/**
* @brief check if the space exist
*
* @return
*/
bool spaceExist(GraphSpaceID spaceId);

private:
using PriorityQueue = folly::PriorityUMPSCQueueSet<std::tuple<JbOp, JobID, GraphSpaceID>, true>;
// Each PriorityQueue contains high and low priority queues.
Expand Down
2 changes: 2 additions & 0 deletions src/meta/processors/job/ReportTaskProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ namespace meta {
#include "meta/processors/job/JobManager.h"

void ReportTaskProcessor::process(const cpp2::ReportTaskReq& req) {
CHECK_SPACE_ID_AND_RETURN(req.get_space_id());

JobManager* jobMgr = JobManager::getInstance();
auto rc = jobMgr->reportTaskFinish(req);
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand Down
17 changes: 17 additions & 0 deletions src/meta/processors/parts/DropSpaceProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,23 @@ void DropSpaceProcessor::process(const cpp2::DropSpaceReq& req) {
auto localIdkey = MetaKeyUtils::localIdKey(spaceId);
batchHolder->remove(std::move(localIdkey));

// 8. Delete all job data
auto jobPrefix = MetaKeyUtils::jobPrefix(spaceId);
auto jobRet = doPrefix(jobPrefix);
if (!nebula::ok(jobRet)) {
auto retCode = nebula::error(jobRet);
LOG(INFO) << "Loading Job Failed" << apache::thrift::util::enumNameSafe(retCode);
handleErrorCode(retCode);
onFinished();
return;
}

auto jobIter = nebula::value(jobRet).get();
while (jobIter->valid()) {
batchHolder->remove(jobIter->key().str());
jobIter->next();
}

auto timeInMilliSec = time::WallClock::fastNowInMilliSec();
LastUpdateTimeMan::update(batchHolder.get(), timeInMilliSec);
auto batch = encodeBatchValue(std::move(batchHolder)->getBatch());
Expand Down