Skip to content

Commit

Permalink
Refactor removeSessionFromLocalCache()
Browse files Browse the repository at this point in the history
  • Loading branch information
Aiee authored and xtcyclist committed Dec 29, 2022
1 parent f48547f commit 2e72f13
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 63 deletions.
91 changes: 38 additions & 53 deletions src/graph/session/GraphSessionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,23 +164,7 @@ folly::Future<StatusOr<std::shared_ptr<ClientSession>>> GraphSessionManager::cre
}

void GraphSessionManager::removeSession(SessionID id) {
auto iter = activeSessions_.find(id);
if (iter == activeSessions_.end()) {
return;
}

// Before removing the session, all queries on the session
// need to be marked as killed.
iter->second->markAllQueryKilled();
auto resp = metaClient_->removeSessions({id}).get();
if (!resp.ok()) {
// it will delete by reclaim
LOG(ERROR) << "Remove session `" << id << "' failed: " << resp.status();
return;
}

removeSessionFromLocalCache(iter->second->getSession());
activeSessions_.erase(iter);
removeMultiSessions({id});
}

int32_t GraphSessionManager::removeMultiSessions(const std::vector<SessionID>& ids) {
Expand All @@ -192,17 +176,10 @@ int32_t GraphSessionManager::removeMultiSessions(const std::vector<SessionID>& i
return -1;
}

for (auto& id : ids) {
auto iter = activeSessions_.find(id);

// if the session is not in the current graph, ignore it
if (iter == activeSessions_.end()) {
continue;
}
auto killedSessions = resp.value().get_removed_session_ids();

removeSessionFromLocalCache(iter->second->getSession());
}
return resp.value().get_removed_session_num();
removeSessionFromLocalCache(killedSessions);
return killedSessions.size();
}

void GraphSessionManager::threadFunc() {
Expand All @@ -227,37 +204,36 @@ void GraphSessionManager::reclaimExpiredSessions() {
}

FVLOG3("Try to reclaim expired sessions out of %lu ones", activeSessions_.size());
auto iter = activeSessions_.begin();
auto end = activeSessions_.end();
std::vector<SessionID> expiredSessions;

while (iter != end) {
int32_t idleSecs = iter->second->idleSeconds();
VLOG(2) << "SessionId: " << iter->first << ", idleSecs: " << idleSecs;
// collect expired sessions
for (const auto& iter : activeSessions_) {
int32_t idleSecs = iter.second->idleSeconds();
VLOG(2) << "SessionId: " << iter.first << ", idleSecs: " << idleSecs;
if (idleSecs < FLAGS_session_idle_timeout_secs) {
++iter;
continue;
}
FLOG_INFO("ClientSession %ld has expired", iter->first);

// collect expired sessions
expiredSessions.emplace_back(iter->first);
FLOG_INFO("ClientSession %ld has expired", iter.first);

removeSessionFromLocalCache(iter->second->getSession());
stats::StatsManager::addValue(kNumReclaimedExpiredSessions);
iter = activeSessions_.erase(iter);
expiredSessions.emplace_back(iter.first);
// TODO: Disconnect the connection of the session
}

// Remove expired sessions from meta server
if (expiredSessions.empty()) {
return;
}

auto resp = metaClient_->removeSessions(expiredSessions).get();
auto resp = metaClient_->removeSessions(std::move(expiredSessions)).get();
if (!resp.ok()) {
// TODO: Handle cases where the delete client failed
LOG(ERROR) << "Remove session failed: " << resp.status();
return;
}

auto killedSessions = resp.value().get_removed_session_ids();
// Remove expired sessions from local cache
removeSessionFromLocalCache(killedSessions);
}

void GraphSessionManager::updateSessionsToMeta() {
Expand Down Expand Up @@ -308,13 +284,7 @@ void GraphSessionManager::updateSessionsToMeta() {
// local cache and update statistics
auto handleKilledSessions = [this](auto&& resp) {
auto killSessions = resp.value().get_killed_sessions();
for (auto id : killSessions) {
auto iter = activeSessions_.find(id);
if (iter == activeSessions_.end()) {
continue;
}
removeSessionFromLocalCache(iter->second->getSession());
}
removeSessionFromLocalCache(killSessions);
};

auto result = metaClient_->updateSessions(sessions).get();
Expand Down Expand Up @@ -372,12 +342,27 @@ Status GraphSessionManager::init() {
return Status::OK();
}

void GraphSessionManager::removeSessionFromLocalCache(const meta::cpp2::Session& session) {
std::string key = session.get_user_name() + session.get_client_ip();
void GraphSessionManager::removeSessionFromLocalCache(const std::vector<SessionID>& ids) {
for (auto& id : ids) {
// if the session is not in the current graph, ignore it
auto iter = activeSessions_.find(id);
if (iter == activeSessions_.end()) {
continue;
}
auto sessionPtr = iter->second;
activeSessions_.erase(iter);

// All queries on the session need to be marked as killed.
sessionPtr->markAllQueryKilled();

// delete session count from cache
subSessionCount(key);
stats::StatsManager::decValue(kNumActiveSessions);
// delete session count from cache
std::string key =
sessionPtr->getSession().get_user_name() + sessionPtr->getSession().get_client_ip();
subSessionCount(key);

// update stats
stats::StatsManager::decValue(kNumActiveSessions);
}
}

bool GraphSessionManager::addSessionCount(std::string& key) {
Expand Down
2 changes: 1 addition & 1 deletion src/graph/session/GraphSessionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class GraphSessionManager final : public SessionManager<ClientSession> {

// Removes a session from the local cache.
// All queries within the expired session will be marked as killed and stats will be updated.
void removeSessionFromLocalCache(const meta::cpp2::Session& session);
void removeSessionFromLocalCache(const std::vector<SessionID>& ids);

// Reclaims expired sessions.
// All queries within the expired session will be marked as killed.
Expand Down
6 changes: 3 additions & 3 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1141,9 +1141,9 @@ struct RemoveSessionReq {
}

struct RemoveSessionResp {
1: common.ErrorCode code,
2: common.HostAddr leader,
3: i32 removed_session_num,
1: common.ErrorCode code,
2: common.HostAddr leader,
3: list<common.SessionID> removed_session_ids,
}

struct KillQueryReq {
Expand Down
32 changes: 26 additions & 6 deletions src/meta/processors/session/SessionManagerProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,9 @@ void GetSessionProcessor::process(const cpp2::GetSessionReq& req) {

void RemoveSessionProcessor::process(const cpp2::RemoveSessionReq& req) {
folly::SharedMutex::WriteHolder holder(LockUtils::sessionLock());
auto removedSessionNum = 0;
std::vector<SessionID> killedSessions;

auto sessionIds = req.get_session_ids();
std::vector<std::string> keys;

for (auto sessionId : sessionIds) {
auto sessionKey = MetaKeyUtils::sessionKey(sessionId);
Expand All @@ -175,12 +174,33 @@ void RemoveSessionProcessor::process(const cpp2::RemoveSessionReq& req) {
LOG(INFO) << "Session id `" << sessionId << "' not found";
continue;
}
keys.emplace_back(sessionKey);
++removedSessionNum;

// Remove session key from kvstore
folly::Baton<true, std::atomic> baton;
nebula::cpp2::ErrorCode errorCode;
kvstore_->asyncRemove(kDefaultSpaceId,
kDefaultPartId,
sessionKey,
[this, &baton, &errorCode](nebula::cpp2::ErrorCode code) {
this->handleErrorCode(code);
errorCode = code;
baton.post();
});
baton.wait();

// continue if the session is not removed successfully
if (errorCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Remove session key failed, error code: " << static_cast<int32_t>(errorCode);
continue;
}

// record the removed session id
killedSessions.emplace_back(sessionId);
}

handleErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED);
resp_.removed_session_num_ref() = removedSessionNum;
doMultiRemove(std::move(keys));
resp_.removed_session_ids_ref() = std::move(killedSessions);
onFinished();
}

void KillQueryProcessor::process(const cpp2::KillQueryReq& req) {
Expand Down
3 changes: 3 additions & 0 deletions tests/job/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,3 +335,6 @@ def test_kill_session_basic(self):
# wrong type of session id
resp = self.execute('KILL SESSION "123"')
self.check_resp_failed(resp, ttypes.ErrorCode.E_SEMANTIC_ERROR)

# def test_kill_session_multi_graph(self):
# # kill a session in another host

0 comments on commit 2e72f13

Please sign in to comment.