Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rwjs/unique sids #1049

Merged
merged 26 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8701b3d
Added a static SessionManager share_ptr to the OnMessageTask class so…
robsimmonds Mar 11, 2022
9dadc25
Set the pointer to the SessionManager for the OnMessageTask objects.
robsimmonds Mar 11, 2022
6a04afa
Updated some logging messages.
robsimmonds Mar 11, 2022
cd9f046
Removed a test in the excution loop that was not needed.
robsimmonds Mar 11, 2022
0f9aed7
Updated some logging.
robsimmonds Mar 11, 2022
8a80ff9
Added shared_ptr to SessionManager to enable sessions to be removed
robsimmonds Mar 11, 2022
f21e0f9
Made _num_sessions counter volatile.
robsimmonds Mar 11, 2022
45f8732
Changed way that session_id is generated in OnUpgrade. This has not f…
robsimmonds Mar 11, 2022
999aef2
Updated logging to be more descriptive.
robsimmonds Mar 11, 2022
dce1d08
Merge branch 'dev' into rwjs/session_fix_m11
robsimmonds Mar 11, 2022
62a0398
Merge branch 'dev' into rwjs/unique_sids
robsimmonds Mar 11, 2022
f4bb3f4
Added check before using the shared_ptr to the SessionManager. Turns
robsimmonds Mar 11, 2022
4d73dd3
Merge branch 'rwjs/session_fix_m11' into rwjs/unique_sids
robsimmonds Mar 11, 2022
d7ce3cc
Changed from a short to a long parameter name.
robsimmonds Mar 11, 2022
bd9c73c
Merge branch 'rwjs/session_fix_m11' into rwjs/unique_sids
robsimmonds Mar 11, 2022
3f5c3d9
Fixed the type of the parameter to DeleteSession.
robsimmonds Mar 11, 2022
08d28e8
Remove unused method.
robsimmonds Mar 12, 2022
36ef101
Shorter timeout on exit.
robsimmonds Mar 14, 2022
f0a6439
Merge branch 'rwjs/session_fix_m11' into rwjs/unique_sids
robsimmonds Mar 15, 2022
99f253f
Added a test before deleting session.
robsimmonds Mar 15, 2022
15423b8
Merge branch 'dev' into rwjs/session_fix_m11
robsimmonds Mar 15, 2022
5eb38da
Merge branch 'dev' into rwjs/unique_sids
robsimmonds Mar 15, 2022
aeb20ec
Merge branch 'rwjs/session_fix_m11' into rwjs/unique_sids
robsimmonds Mar 15, 2022
0eda9ec
Changed to C++ way of geting microsecond time to use for the Session ID.
robsimmonds Mar 15, 2022
aceba4e
Format fix.
robsimmonds Mar 15, 2022
a1d1ba3
Merge branch 'dev' into rwjs/unique_sids
robsimmonds Mar 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Main/Main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ int main(int argc, char* argv[]) {

// Session manager
session_manager = make_shared<SessionManager>(settings, auth_token, file_list_handler);
carta::OnMessageTask::SetSessionManager(session_manager);

// HTTP server
if (!settings.no_frontend || !settings.no_database || settings.enable_scripting) {
Expand Down
2 changes: 2 additions & 0 deletions src/Session/OnMessageTask.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

using namespace carta;

std::shared_ptr<SessionManager> OnMessageTask::_session_manager;

OnMessageTask* SetImageChannelsTask::execute() {
std::pair<CARTA::SetImageChannels, uint32_t> request_pair;
bool tester;
Expand Down
13 changes: 12 additions & 1 deletion src/Session/OnMessageTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

#include "AnimationObject.h"
#include "Session.h"
#include "SessionManager.h"
#include "Util/Message.h"

namespace carta {

class OnMessageTask {
private:
static std::shared_ptr<SessionManager> _session_manager;

protected:
Session* _session;

Expand All @@ -31,10 +35,17 @@ class OnMessageTask {
}
virtual ~OnMessageTask() {
if (!_session->DecreaseRefCount()) {
delete _session;
spdlog::info("({}) Remove Session {} in ~OMT", fmt::ptr(_session), _session->GetId());
// Test here since the CARTA test system does not set this shared_ptr for all tests.
if (_session_manager) {
_session_manager->DeleteSession(_session->GetId());
}
}
_session = nullptr;
}
static void SetSessionManager(shared_ptr<SessionManager>& session_manager) {
_session_manager = session_manager;
}
virtual OnMessageTask* execute() = 0;
};

Expand Down
17 changes: 9 additions & 8 deletions src/Session/Session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void LoaderCache::Remove(const std::string& filename) {
_queue.remove(filename);
}

int Session::_num_sessions = 0;
volatile int Session::_num_sessions = 0;
int Session::_exit_after_num_seconds = 5;
bool Session::_exit_when_all_sessions_closed = false;
std::thread* Session::_animation_thread = nullptr;
Expand Down Expand Up @@ -121,7 +121,7 @@ Session::Session(uWS::WebSocket<false, true, PerSocketData>* ws, uWS::Loop* loop
_connected = true;
++_num_sessions;
UpdateLastMessageTimestamp();
spdlog::debug("{} ::Session ({})", fmt::ptr(this), _num_sessions);
spdlog::info("{} ::Session ({}:{})", fmt::ptr(this), _id, _num_sessions);
}

static int __exit_backend_timer = 0;
Expand All @@ -147,17 +147,17 @@ void ExitNoSessions(int s) {

Session::~Session() {
--_num_sessions;
spdlog::debug("{} ~Session {}", fmt::ptr(this), _num_sessions);
spdlog::debug("{} ~Session : num sessions = {}", fmt::ptr(this), _num_sessions);
if (!_num_sessions) {
spdlog::info("No remaining sessions.");
if (_exit_when_all_sessions_closed) {
if (_exit_after_num_seconds == 0) {
spdlog::info("Exiting due to no sessions remaining");
ThreadManager::ExitEventHandlingThreads();
spdlog::debug("Exiting due to no sessions remaining");
logger::FlushLogFile();
exit(0);
__exit_backend_timer = 1;
} else {
__exit_backend_timer = _exit_after_num_seconds;
}
__exit_backend_timer = _exit_after_num_seconds;
struct sigaction sig_handler;
sig_handler.sa_handler = ExitNoSessions;
sigemptyset(&sig_handler.sa_mask);
Expand Down Expand Up @@ -363,6 +363,7 @@ void Session::OnRegisterViewer(const CARTA::RegisterViewer& message, uint16_t ic
} else {
type = CARTA::SessionType::RESUMED;
if (session_id != _id) {
spdlog::info("({}) Session setting id to {} (was {}) on resume", fmt::ptr(this), session_id, _id);
_id = session_id;
status = fmt::format("Start a new backend and assign it with session id {}", session_id);
} else {
Expand Down Expand Up @@ -1083,7 +1084,7 @@ void Session::OnSetContourParameters(const CARTA::SetContourParameters& message,

void Session::OnResumeSession(const CARTA::ResumeSession& message, uint32_t request_id) {
bool success(true);
spdlog::info("Client {} [{}] Resumed.", GetId(), GetAddress());
spdlog::info("Session {} [{}] Resumed.", GetId(), GetAddress());

// Error messages
std::string err_message;
Expand Down
5 changes: 1 addition & 4 deletions src/Session/Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,6 @@ class Session {
void SendFileEvent(
int file_id, CARTA::EventType event_type, u_int32_t event_id, google::protobuf::MessageLite& message, bool compress = true);
void SendLogEvent(const std::string& message, std::vector<std::string> tags, CARTA::ErrorSeverity severity);
void StartAnimationThread() {
// Not sure if needed... XXX
}

// uWebSockets
uWS::WebSocket<false, true, PerSocketData>* _socket;
Expand Down Expand Up @@ -346,7 +343,7 @@ class Session {
std::atomic<int> _ref_count;
int _animation_id;
bool _connected;
static int _num_sessions;
static volatile int _num_sessions;
static int _exit_after_num_seconds;
static bool _exit_when_all_sessions_closed;
static std::thread* _animation_thread;
Expand Down
28 changes: 19 additions & 9 deletions src/Session/SessionManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
SPDX-License-Identifier: GPL-3.0-or-later
*/

#include "SessionManager.h"
#include "ThreadingManager/ThreadingManager.h"
#include <sys/time.h>

#include "Logger/Logger.h"
#include "OnMessageTask.h"
#include "SessionManager.h"
#include "ThreadingManager/ThreadingManager.h"
#include "Util/Message.h"
#include "Util/Token.h"

Expand All @@ -17,18 +18,24 @@ namespace carta {
SessionManager::SessionManager(ProgramSettings& settings, std::string auth_token, std::shared_ptr<FileListHandler> file_list_handler)
: _session_number(0), _app(uWS::App()), _settings(settings), _auth_token(auth_token), _file_list_handler(file_list_handler) {}

void SessionManager::DeleteSession(int session_id) {
void SessionManager::DeleteSession(uint32_t session_id) {
Session* session = _sessions[session_id];
if (session) {
spdlog::info(
"Client {} [{}] Deleted. Remaining sessions: {}", session->GetId(), session->GetAddress(), Session::NumberOfSessions());
"Session {} [{}] Deleted. Remaining sessions: {}", session->GetId(), session->GetAddress(), Session::NumberOfSessions());
session->WaitForTaskCancellation();
session->CloseAllScriptingRequests();
if (!session->DecreaseRefCount()) {

if (!session->GetRefCount()) {
spdlog::info("Sessions in Session Map :");
for (const std::pair<uint32_t, Session*>& ssp : _sessions) {
Session* ss = ssp.second;
spdlog::info("\tMap id {}, session id {}, session ptr {}", ssp.first, ss->GetId(), fmt::ptr(ss));
}
delete session;
_sessions.erase(session_id);
} else {
spdlog::warn("Session {} reference count is not 0 ({}) on deletion!", session_id, session->GetRefCount());
spdlog::info("Session {} reference count is not 0 ({}) at this point in DeleteSession", session_id, session->GetRefCount());
}
} else {
spdlog::warn("Could not delete session {}: not found!", session_id);
Expand All @@ -51,9 +58,9 @@ void SessionManager::OnUpgrade(
return;
}

_session_number++;
// protect against overflow
_session_number = max(_session_number, 1u);
struct timeval tv;
robsimmonds marked this conversation as resolved.
Show resolved Hide resolved
gettimeofday(&tv, nullptr);
_session_number = (((uint32_t)tv.tv_sec) << 16) + ((uint32_t)tv.tv_usec);

http_response->template upgrade<PerSocketData>({_session_number, address}, //
http_request->getHeader("sec-websocket-key"), //
Expand Down Expand Up @@ -97,6 +104,7 @@ void SessionManager::OnDisconnect(WSType* ws, int code, std::string_view message
uint32_t session_id = static_cast<PerSocketData*>(ws->getUserData())->session_id;

// Delete the Session
_sessions[session_id]->DecreaseRefCount();
robsimmonds marked this conversation as resolved.
Show resolved Hide resolved
DeleteSession(session_id);

// Close the websockets
Expand Down Expand Up @@ -134,6 +142,7 @@ void SessionManager::OnMessage(WSType* ws, std::string_view sv_message, uWS::OpC
logger::LogReceivedEventType(event_type);

auto event_type_name = CARTA::EventType_Name(CARTA::EventType(event_type));

bool message_parsed(false);
OnMessageTask* tsk = nullptr;

Expand All @@ -148,6 +157,7 @@ void SessionManager::OnMessage(WSType* ws, std::string_view sv_message, uWS::OpC
}
case CARTA::EventType::RESUME_SESSION: {
CARTA::ResumeSession message;
spdlog::debug("({})({}) resuming session", fmt::ptr(session), session->GetId());
if (message.ParseFromArray(event_buf, event_length)) {
session->OnResumeSession(message, head.request_id);
message_parsed = true;
Expand Down
2 changes: 1 addition & 1 deletion src/Session/SessionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class SessionManager {
public:
using WSType = uWS::WebSocket<false, true, PerSocketData>;
SessionManager(ProgramSettings& settings, std::string auth_token, std::shared_ptr<FileListHandler>);
void DeleteSession(int session_id);
void DeleteSession(uint32_t session_id);
void OnUpgrade(uWS::HttpResponse<false>* http_response, uWS::HttpRequest* http_request, struct us_socket_context_t* context);
// Called on connection. Creates session objects and assigns UUID to it
void OnConnect(WSType* ws);
Expand Down
3 changes: 2 additions & 1 deletion src/ThreadingManager/ThreadingManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ void ThreadManager::StartEventHandlingThreads(int num_threads) {
do {
std::unique_lock<std::mutex> lock(_task_queue_mtx);

if (_task_queue.empty() || !(tsk = _task_queue.front())) {
if (_task_queue.empty()) {
_task_queue_cv.wait(lock);
} else {
tsk = _task_queue.front();
_task_queue.pop_front();
lock.unlock();
tsk->execute();
Expand Down