Skip to content

Commit

Permalink
Fix update sessions when leader change happens (#5225)
Browse files Browse the repository at this point in the history
* Fix udpate sessions when leader change happens

* Handle errors on the graph side

* Address comments

* Address comments
  • Loading branch information
Aiee authored and Sophie-Xie committed Jan 28, 2023
1 parent 2f0ccaa commit 70e7864
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 7 deletions.
13 changes: 10 additions & 3 deletions src/graph/session/GraphSessionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "graph/session/GraphSessionManager.h"

#include "common/base/Base.h"
#include "common/base/Status.h"
#include "common/stats/StatsManager.h"
#include "common/time/WallClock.h"
#include "graph/service/GraphFlags.h"
Expand Down Expand Up @@ -258,8 +259,9 @@ void GraphSessionManager::updateSessionsToMeta() {
auto handleKilledQueries = [this](auto&& resp) {
if (!resp.ok()) {
LOG(ERROR) << "Update sessions failed: " << resp.status();
return Status::Error("Update sessions failed: %s", resp.status().toString().c_str());
return;
}

auto& killedQueriesForEachSession = *resp.value().killed_queries_ref();
for (auto& killedQueries : killedQueriesForEachSession) {
auto sessionId = killedQueries.first;
Expand All @@ -276,19 +278,24 @@ void GraphSessionManager::updateSessionsToMeta() {
VLOG(1) << "Kill query, session: " << sessionId << " plan: " << epId;
}
}
return Status::OK();
};

// The response from meta contains sessions that are marked as killed, so we need to clean the
// local cache and update statistics
auto handleKilledSessions = [this](auto&& resp) {
if (!resp.ok()) {
LOG(ERROR) << "Update sessions failed: " << resp.status();
return;
}

auto killSessions = resp.value().get_killed_sessions();
removeSessionFromLocalCache(killSessions);
};

auto result = metaClient_->updateSessions(sessions).get();
if (!result.ok()) {
LOG(ERROR) << "Update sessions failed: " << result;
LOG(ERROR) << "Update sessions failed: " << result.status();
return;
}
handleKilledQueries(result);
handleKilledSessions(result);
Expand Down
20 changes: 16 additions & 4 deletions src/meta/processors/session/SessionManagerProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,15 @@ void UpdateSessionsProcessor::process(const cpp2::UpdateSessionsReq& req) {
if (!nebula::ok(ret)) {
auto errCode = nebula::error(ret);
LOG(INFO) << "Session id '" << sessionId << "' not found";
// If the session requested to be updated can not be found in meta, the session has been
// killed
// If the session requested to be updated can not be found in meta, we consider the session
// has been killed
if (errCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
killedSessions.emplace_back(sessionId);
continue;
} else {
handleErrorCode(errCode);
onFinished();
return;
}
}

Expand Down Expand Up @@ -169,10 +173,18 @@ void RemoveSessionProcessor::process(const cpp2::RemoveSessionReq& req) {
auto sessionKey = MetaKeyUtils::sessionKey(sessionId);
auto ret = doGet(sessionKey);

// If the session is not found, we should continue to remove other sessions.
if (!nebula::ok(ret)) {
auto errCode = nebula::error(ret);
LOG(INFO) << "Session id `" << sessionId << "' not found";
continue;

// If the session is not found, we should continue to remove other sessions.
if (errCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
continue;
} else { // for other error like leader change, we handle the error and return.
handleErrorCode(errCode);
onFinished();
return;
}
}

// Remove session key from kvstore
Expand Down

0 comments on commit 70e7864

Please sign in to comment.