diff --git a/src/server/network/connection/connection.cpp b/src/server/network/connection/connection.cpp index 76c954af41c..190140d0972 100644 --- a/src/server/network/connection/connection.cpp +++ b/src/server/network/connection/connection.cpp @@ -34,15 +34,24 @@ void ConnectionManager::releaseConnection(const Connection_ptr &connection) { void ConnectionManager::closeAll() { connections.for_each([&](const Connection_ptr &connection) { - if (connection->socket.is_open()) { + if (connection && connection->socket.is_open()) { try { std::error_code error; connection->socket.shutdown(asio::ip::tcp::socket::shutdown_both, error); - if (error) { - g_logger().error("[ConnectionManager::closeAll] - Failed to close connection, system error code {}", error.message()); + if (error && error != asio::error::not_connected) { + g_logger().error("[ConnectionManager::closeAll] - Failed to shutdown connection: {}", error.message()); + } + + connection->socket.close(error); + if (error && error != asio::error::not_connected) { + g_logger().error("[ConnectionManager::closeAll] - Failed to close connection: {}", error.message()); } } catch (const std::system_error &systemError) { g_logger().error("[ConnectionManager::closeAll] - Exception caught: {}", systemError.what()); + } catch (const std::exception &e) { + g_logger().error("[ConnectionManager::closeAll] - Unexpected exception caught: {}", e.what()); + } catch (...) { + g_logger().error("[ConnectionManager::closeAll] - Unknown error occurred while closing connection"); } } }); @@ -50,29 +59,33 @@ void ConnectionManager::closeAll() { connections.clear(); } -Connection::Connection(asio::io_service &initIoService, ConstServicePort_ptr initservicePort) : +Connection::Connection(asio::io_context &initIoService, ConstServicePort_ptr initservicePort) : readTimer(initIoService), writeTimer(initIoService), service_port(std::move(initservicePort)), socket(initIoService), m_msg() { } -void Connection::close(bool force) { - ConnectionManager::getInstance().releaseConnection(shared_from_this()); - - std::scoped_lock lock(connectionLock); +void Connection::close(const bool force) { + const auto self = shared_from_this(); + ConnectionManager::getInstance().releaseConnection(self); ip = 0; - if (connectionState == CONNECTION_STATE_CLOSED) { + if (connectionState.exchange(CONNECTION_STATE_CLOSED) == CONNECTION_STATE_CLOSED) { return; } - connectionState = CONNECTION_STATE_CLOSED; if (protocol) { - g_dispatcher().addEvent([protocol = protocol] { protocol->release(); }, __FUNCTION__, std::chrono::milliseconds(CONNECTION_WRITE_TIMEOUT * 1000).count()); + auto weakProtocol = std::weak_ptr(protocol); + g_dispatcher().addEvent([weakProtocol] { + if (const auto protocol = weakProtocol.lock()) { + protocol->release(); + } + }, + __FUNCTION__, std::chrono::milliseconds(CONNECTION_WRITE_TIMEOUT * 1000).count()); } - if (messageQueue.empty() || force) { + if (messageQueue.was_empty() || force) { closeSocket(); } } @@ -83,9 +96,21 @@ void Connection::closeSocket() { } try { - readTimer.cancel(); - writeTimer.cancel(); - socket.cancel(); + std::error_code timerError; + readTimer.cancel(timerError); + if (timerError && timerError != asio::error::operation_aborted) { + g_logger().warn("[Connection::closeSocket] - Failed to cancel read timer: {}", timerError.message()); + } + + writeTimer.cancel(timerError); + if (timerError && timerError != asio::error::operation_aborted) { + g_logger().warn("[Connection::closeSocket] - Failed to cancel write timer: {}", timerError.message()); + } + + socket.cancel(timerError); + if (timerError && timerError != asio::error::operation_aborted) { + g_logger().warn("[Connection::closeSocket] - Failed to cancel socket operations: {}", timerError.message()); + } std::error_code error; socket.shutdown(asio::ip::tcp::socket::shutdown_both, error); @@ -98,83 +123,132 @@ void Connection::closeSocket() { g_logger().error("[Connection::closeSocket] - Failed to close socket: {}", error.message()); } } catch (const std::system_error &e) { - g_logger().error("[Connection::closeSocket] - error closeSocket: {}", e.what()); + g_logger().error("[Connection::closeSocket] - System error during socket close: {}", e.what()); + } catch (const std::exception &e) { + g_logger().error("[Connection::closeSocket] - Unexpected error during socket close: {}", e.what()); + } catch (...) { + g_logger().error("[Connection::closeSocket] - Unknown error occurred during socket close"); } } void Connection::accept(Protocol_ptr protocolPtr) { - connectionState = CONNECTION_STATE_IDENTIFYING; + connectionState.store(CONNECTION_STATE_IDENTIFYING); + protocol = std::move(protocolPtr); - g_dispatcher().addEvent([protocol = protocol] { protocol->onConnect(); }, __FUNCTION__, std::chrono::milliseconds(CONNECTION_WRITE_TIMEOUT * 1000).count()); + auto weakProtocol = std::weak_ptr(protocol); + + g_dispatcher().addEvent([weakProtocol] { + if (const auto protocol = weakProtocol.lock()) { + protocol->onConnect(); + } + }, + __FUNCTION__, std::chrono::milliseconds(CONNECTION_WRITE_TIMEOUT * 1000).count()); acceptInternal(false); } void Connection::acceptInternal(bool toggleParseHeader) { readTimer.expires_from_now(std::chrono::seconds(CONNECTION_READ_TIMEOUT)); - readTimer.async_wait([self = std::weak_ptr(shared_from_this())](const std::error_code &error) { Connection::handleTimeout(self, error); }); + + auto weakSelf = std::weak_ptr(shared_from_this()); + + readTimer.async_wait([weakSelf](const std::error_code &error) { + if (const auto self = weakSelf.lock()) { + handleTimeout(self, error); + } + }); try { - asio::async_read(socket, asio::buffer(m_msg.getBuffer(), HEADER_LENGTH), [self = shared_from_this(), toggleParseHeader](const std::error_code &error, std::size_t N) { - if (toggleParseHeader) { - self->parseHeader(error); + asio::async_read(socket, asio::buffer(m_msg.getBuffer(), HEADER_LENGTH), [weakSelf, toggleParseHeader](const std::error_code &error, std::size_t) { + if (const auto self = weakSelf.lock()) { + if (toggleParseHeader) { + self->parseHeader(error); + } else { + self->parseProxyIdentification(error); + } } else { - self->parseProxyIdentification(error); + g_logger().warn("[Connection::acceptInternal] - Connection no longer exists during async_read"); } }); } catch (const std::system_error &e) { - g_logger().error("[Connection::acceptInternal] - Exception in async_read: {}", e.what()); - close(FORCE_CLOSE); + g_logger().error("[Connection::acceptInternal] - System error in async_read: {}", e.what()); + close(true); + } catch (const std::exception &e) { + g_logger().error("[Connection::acceptInternal] - Unexpected error in async_read: {}", e.what()); + close(true); + } catch (...) { + g_logger().error("[Connection::acceptInternal] - Unknown error in async_read"); + close(true); } } void Connection::parseProxyIdentification(const std::error_code &error) { - std::scoped_lock lock(connectionLock); - readTimer.cancel(); + std::error_code timerError; + readTimer.cancel(timerError); + if (timerError && timerError != asio::error::operation_aborted) { + g_logger().warn("[Connection::parseProxyIdentification] - Failed to cancel read timer: {}", timerError.message()); + } - if (error || connectionState == CONNECTION_STATE_CLOSED) { + if (error || connectionState.load() == CONNECTION_STATE_CLOSED) { if (error != asio::error::operation_aborted && error != asio::error::eof && error != asio::error::connection_reset) { g_logger().error("[Connection::parseProxyIdentification] - Read error: {}", error.message()); } - close(FORCE_CLOSE); + close(true); return; } uint8_t* msgBuffer = m_msg.getBuffer(); auto charData = static_cast(static_cast(msgBuffer)); - std::string serverName = g_configManager().getString(SERVER_NAME) + "\n"; - if (connectionState == CONNECTION_STATE_IDENTIFYING) { - if (msgBuffer[1] == 0x00 || strncasecmp(charData, &serverName[0], 2) != 0) { - // Probably not proxy identification so let's try standard parsing method - connectionState = CONNECTION_STATE_OPEN; + const std::string serverName = g_configManager().getString(SERVER_NAME) + "\n"; + + if (connectionState.load() == CONNECTION_STATE_IDENTIFYING) { + if (msgBuffer[1] == 0x00 || strncasecmp(charData, serverName.c_str(), 2) != 0) { + connectionState.store(CONNECTION_STATE_OPEN); parseHeader(error); return; } else { - size_t remainder = serverName.length() - 2; + const size_t remainder = serverName.length() - 2; if (remainder > 0) { - connectionState = CONNECTION_STATE_READINGS; + connectionState.store(CONNECTION_STATE_READINGS); try { readTimer.expires_from_now(std::chrono::seconds(CONNECTION_READ_TIMEOUT)); - readTimer.async_wait([self = std::weak_ptr(shared_from_this())](const std::error_code &error) { Connection::handleTimeout(self, error); }); - - // Read the remainder of proxy identification - asio::async_read(socket, asio::buffer(m_msg.getBuffer(), remainder), [self = shared_from_this()](const std::error_code &error, std::size_t N) { self->parseProxyIdentification(error); }); + auto weakSelf = std::weak_ptr(shared_from_this()); + + readTimer.async_wait([weakSelf](const std::error_code &error) { + if (const auto self = weakSelf.lock()) { + handleTimeout(self, error); + } + }); + + asio::async_read(socket, asio::buffer(m_msg.getBuffer(), remainder), [weakSelf](const std::error_code &error, std::size_t) { + if (const auto self = weakSelf.lock()) { + self->parseProxyIdentification(error); + } else { + g_logger().warn("[Connection::parseProxyIdentification] - Connection no longer exists during async_read"); + } + }); } catch (const std::system_error &e) { - g_logger().error("Connection::parseProxyIdentification] - error: {}", e.what()); - close(FORCE_CLOSE); + g_logger().error("[Connection::parseProxyIdentification] - System error in async_read: {}", e.what()); + close(true); + } catch (const std::exception &e) { + g_logger().error("[Connection::parseProxyIdentification] - Unexpected error in async_read: {}", e.what()); + close(true); + } catch (...) { + g_logger().error("[Connection::parseProxyIdentification] - Unknown error in async_read"); + close(true); } return; } else { - connectionState = CONNECTION_STATE_OPEN; + connectionState.store(CONNECTION_STATE_OPEN); } } - } else if (connectionState == CONNECTION_STATE_READINGS) { + } else if (connectionState.load() == CONNECTION_STATE_READINGS) { size_t remainder = serverName.length() - 2; if (strncasecmp(charData, &serverName[2], remainder) == 0) { - connectionState = CONNECTION_STATE_OPEN; + connectionState.store(CONNECTION_STATE_OPEN); } else { - g_logger().error("Connection::parseProxyIdentification] Invalid Client Login! Server Name mismatch!"); - close(FORCE_CLOSE); + g_logger().error("[Connection::parseProxyIdentification] - Invalid Client Login! Server Name mismatch!"); + close(true); return; } } @@ -183,16 +257,19 @@ void Connection::parseProxyIdentification(const std::error_code &error) { } void Connection::parseHeader(const std::error_code &error) { - std::scoped_lock lock(connectionLock); - readTimer.cancel(); + std::error_code timerError; + readTimer.cancel(timerError); + if (timerError && timerError != asio::error::operation_aborted) { + g_logger().warn("[Connection::parseHeader] - Failed to cancel read timer: {}", timerError.message()); + } if (error) { if (error != asio::error::operation_aborted && error != asio::error::eof && error != asio::error::connection_reset) { g_logger().debug("[Connection::parseHeader] - Read error: {}", error.message()); } - close(FORCE_CLOSE); + close(true); return; - } else if (connectionState == CONNECTION_STATE_CLOSED) { + } else if (connectionState.load() == CONNECTION_STATE_CLOSED) { return; } @@ -210,46 +287,63 @@ void Connection::parseHeader(const std::error_code &error) { uint16_t size = m_msg.getLengthHeader(); if (size == 0 || size > INPUTMESSAGE_MAXSIZE) { - close(FORCE_CLOSE); + close(true); return; } try { readTimer.expires_from_now(std::chrono::seconds(CONNECTION_READ_TIMEOUT)); - readTimer.async_wait([self = std::weak_ptr(shared_from_this())](const std::error_code &error) { Connection::handleTimeout(self, error); }); + auto weakSelf = std::weak_ptr(shared_from_this()); + + readTimer.async_wait([weakSelf](const std::error_code &error) { + if (auto self = weakSelf.lock()) { + handleTimeout(self, error); + } + }); - // Read packet content m_msg.setLength(size + HEADER_LENGTH); - // Read the remainder of proxy identification - asio::async_read(socket, asio::buffer(m_msg.getBodyBuffer(), size), [self = shared_from_this()](const std::error_code &error, std::size_t N) { self->parsePacket(error); }); + + asio::async_read(socket, asio::buffer(m_msg.getBodyBuffer(), size), [weakSelf](const std::error_code &error, std::size_t) { + if (auto self = weakSelf.lock()) { + self->parsePacket(error); + } else { + g_logger().warn("[Connection::parseHeader] - Connection no longer exists during async_read"); + } + }); } catch (const std::system_error &e) { - g_logger().error("[Connection::parseHeader] - error: {}", e.what()); - close(FORCE_CLOSE); + g_logger().error("[Connection::parseHeader] - System error in async_read: {}", e.what()); + close(true); + } catch (const std::exception &e) { + g_logger().error("[Connection::parseHeader] - Unexpected error in async_read: {}", e.what()); + close(true); + } catch (...) { + g_logger().error("[Connection::parseHeader] - Unknown error in async_read"); + close(true); } } void Connection::parsePacket(const std::error_code &error) { - std::scoped_lock lock(connectionLock); - readTimer.cancel(); + std::error_code timerError; + readTimer.cancel(timerError); + if (timerError && timerError != asio::error::operation_aborted) { + g_logger().warn("[Connection::parsePacket] - Failed to cancel read timer: {}", timerError.message()); + } - if (error || connectionState == CONNECTION_STATE_CLOSED) { + if (error || connectionState.load() == CONNECTION_STATE_CLOSED) { if (error) { g_logger().error("[Connection::parsePacket] - Read error: {}", error.message()); } - close(FORCE_CLOSE); + close(true); return; } bool skipReadingNextPacket = false; if (!receivedFirst) { - // First message received receivedFirst = true; if (!protocol) { - // Check packet checksum uint32_t checksum; - if (int32_t len = m_msg.getLength() - m_msg.getBufferPosition() - CHECKSUM_LENGTH; - len > 0) { + if (int32_t len = m_msg.getLength() - m_msg.getBufferPosition() - CHECKSUM_LENGTH; len > 0) { checksum = adlerChecksum(m_msg.getBuffer() + m_msg.getBufferPosition() + CHECKSUM_LENGTH, len); } else { checksum = 0; @@ -257,160 +351,239 @@ void Connection::parsePacket(const std::error_code &error) { uint32_t recvChecksum = m_msg.get(); if (recvChecksum != checksum) { - // it might not have been the checksum, step back m_msg.skipBytes(-CHECKSUM_LENGTH); } - // Game protocol has already been created at this point protocol = service_port->make_protocol(recvChecksum == checksum, m_msg, shared_from_this()); if (!protocol) { - close(FORCE_CLOSE); + close(true); return; } } else { - // It is rather hard to detect if we have checksum or sequence method here so let's skip checksum check - // it doesn't generate any problem because olders protocol don't use 'server sends first' feature m_msg.get(); - // Skip protocol ID m_msg.skipBytes(1); } protocol->onRecvFirstMessage(m_msg); } else { - // Send the packet to the current protocol skipReadingNextPacket = protocol->onRecvMessage(m_msg); } try { readTimer.expires_from_now(std::chrono::seconds(CONNECTION_READ_TIMEOUT)); - readTimer.async_wait([self = std::weak_ptr(shared_from_this())](const std::error_code &error) { Connection::handleTimeout(self, error); }); + auto weakSelf = std::weak_ptr(shared_from_this()); + + readTimer.async_wait([weakSelf](const std::error_code &error) { + if (const auto self = weakSelf.lock()) { + handleTimeout(self, error); + } + }); if (!skipReadingNextPacket) { - // Wait to the next packet - asio::async_read(socket, asio::buffer(m_msg.getBuffer(), HEADER_LENGTH), [self = shared_from_this()](const std::error_code &error, std::size_t N) { self->parseHeader(error); }); + asio::async_read(socket, asio::buffer(m_msg.getBuffer(), HEADER_LENGTH), [weakSelf](const std::error_code &error, std::size_t) { + if (const auto self = weakSelf.lock()) { + self->parseHeader(error); + } else { + g_logger().warn("[Connection::parsePacket] - Connection no longer exists during async_read"); + } + }); } } catch (const std::system_error &e) { - g_logger().error("[Connection::parsePacket] - error: {}", e.what()); - close(FORCE_CLOSE); + g_logger().error("[Connection::parsePacket] - System error in async_read: {}", e.what()); + close(true); + } catch (const std::exception &e) { + g_logger().error("[Connection::parsePacket] - Unexpected error in async_read: {}", e.what()); + close(true); + } catch (...) { + g_logger().error("[Connection::parsePacket] - Unknown error in async_read"); + close(true); } } void Connection::resumeWork() { + readTimer.expires_from_now(std::chrono::seconds(CONNECTION_READ_TIMEOUT)); - readTimer.async_wait([self = std::weak_ptr(shared_from_this())](const std::error_code &error) { Connection::handleTimeout(self, error); }); + auto weakSelf = std::weak_ptr(shared_from_this()); + + readTimer.async_wait([weakSelf](const std::error_code &error) { + if (const auto self = weakSelf.lock()) { + handleTimeout(self, error); + } + }); try { - asio::async_read(socket, asio::buffer(m_msg.getBuffer(), HEADER_LENGTH), [self = shared_from_this()](const std::error_code &error, std::size_t N) { self->parseHeader(error); }); + asio::async_read(socket, asio::buffer(m_msg.getBuffer(), HEADER_LENGTH), [weakSelf](const std::error_code &error, std::size_t bytesTransferred) { + if (const auto self = weakSelf.lock()) { + self->parseHeader(error); + } else { + g_logger().warn("[Connection::resumeWork] - Connection no longer exists during async_read"); + } + }); } catch (const std::system_error &e) { - g_logger().error("[Connection::resumeWork] - Exception in async_read: {}", e.what()); - close(FORCE_CLOSE); + g_logger().error("[Connection::resumeWork] - System error in async_read: {}", e.what()); + close(true); + } catch (const std::exception &e) { + g_logger().error("[Connection::resumeWork] - Unexpected error in async_read: {}", e.what()); + close(true); + } catch (...) { + g_logger().error("[Connection::resumeWork] - Unknown error in async_read"); + close(true); } } void Connection::send(const OutputMessage_ptr &outputMessage) { - std::scoped_lock lock(connectionLock); - if (connectionState == CONNECTION_STATE_CLOSED) { + if (connectionState.load() == CONNECTION_STATE_CLOSED) { return; } - bool noPendingWrite = messageQueue.empty(); - messageQueue.emplace_back(outputMessage); - - if (noPendingWrite) { + if (messageQueue.try_push(outputMessage)) { if (socket.is_open()) { + auto weakSelf = std::weak_ptr(shared_from_this()); + try { - asio::post(socket.get_executor(), [self = shared_from_this()] { self->internalWorker(); }); + asio::post(socket.get_executor(), [weakSelf] { + if (const auto self = weakSelf.lock()) { + self->internalWorker(); + } else { + g_logger().warn("[Connection::send] - Connection no longer exists during posting write operation"); + } + }); } catch (const std::system_error &e) { - g_logger().error("[Connection::send] - Exception in posting write operation: {}", e.what()); - close(FORCE_CLOSE); + g_logger().error("[Connection::send] - System error in posting write operation: {}", e.what()); + close(true); + } catch (const std::exception &e) { + g_logger().error("[Connection::send] - Unexpected error in posting write operation: {}", e.what()); + close(true); + } catch (...) { + g_logger().error("[Connection::send] - Unknown error in posting write operation"); + close(true); } } else { g_logger().error("[Connection::send] - Socket is not open for writing."); - close(FORCE_CLOSE); + close(true); } + } else { + g_logger().warn("[Connection::send] - Message queue is full. Discarding message."); } } void Connection::internalWorker() { - std::unique_lock lock(connectionLock); - if (messageQueue.empty()) { - if (connectionState == CONNECTION_STATE_CLOSED) { - closeSocket(); + OutputMessage_ptr outputMessage; + + if (messageQueue.try_pop(outputMessage)) { + if (protocol) { + protocol->onSendMessage(outputMessage); + internalSend(outputMessage); + } else { + g_logger().error("[Connection::internalWorker] - Protocol is null. Unable to send message."); } - return; + } else if (connectionState.load() == CONNECTION_STATE_CLOSED) { + closeSocket(); } - - const auto &outputMessage = messageQueue.front(); - lock.unlock(); - protocol->onSendMessage(outputMessage); - lock.lock(); - - internalSend(outputMessage); } uint32_t Connection::getIP() { - std::scoped_lock lock(connectionLock); - - if (ip == 1) { + if (ip.load() == 1) { std::error_code error; - asio::ip::tcp::endpoint endpoint = socket.remote_endpoint(error); + const asio::ip::tcp::endpoint endpoint = socket.remote_endpoint(error); if (error) { g_logger().error("[Connection::getIP] - Failed to get remote endpoint: {}", error.message()); - ip = 0; + ip.store(0); } else { - ip = htonl(endpoint.address().to_v4().to_uint()); + try { + if (endpoint.address().is_v4()) { + ip.store(htonl(endpoint.address().to_v4().to_uint())); + } else { + g_logger().error("[Connection::getIP] - Remote endpoint is not an IPv4 address"); + ip.store(0); + } + } catch (const std::exception &e) { + g_logger().error("[Connection::getIP] - Exception caught while getting IP: {}", e.what()); + ip.store(0); + } catch (...) { + g_logger().error("[Connection::getIP] - Unknown error occurred while getting IP"); + ip.store(0); + } } } - return ip; + return ip.load(); } void Connection::internalSend(const OutputMessage_ptr &outputMessage) { writeTimer.expires_from_now(std::chrono::seconds(CONNECTION_WRITE_TIMEOUT)); - writeTimer.async_wait([self = std::weak_ptr(shared_from_this())](const std::error_code &error) { Connection::handleTimeout(self, error); }); + auto weakSelf = std::weak_ptr(shared_from_this()); + + writeTimer.async_wait([weakSelf](const std::error_code &error) { + if (const auto self = weakSelf.lock()) { + handleTimeout(self, error); + } + }); try { - asio::async_write(socket, asio::buffer(outputMessage->getOutputBuffer(), outputMessage->getLength()), [self = shared_from_this()](const std::error_code &error, std::size_t N) { self->onWriteOperation(error); }); + asio::async_write(socket, asio::buffer(outputMessage->getOutputBuffer(), outputMessage->getLength()), [weakSelf](const std::error_code &error, std::size_t bytesTransferred) { + if (const auto self = weakSelf.lock()) { + self->onWriteOperation(error); + } else { + g_logger().warn("[Connection::internalSend] - Connection no longer exists during async_write"); + } + }); } catch (const std::system_error &e) { - g_logger().error("[Connection::internalSend] - Exception in async_write: {}", e.what()); - close(FORCE_CLOSE); + g_logger().error("[Connection::internalSend] - System error in async_write: {}", e.what()); + close(true); + } catch (const std::exception &e) { + g_logger().error("[Connection::internalSend] - Unexpected error in async_write: {}", e.what()); + close(true); + } catch (...) { + g_logger().error("[Connection::internalSend] - Unknown error in async_write"); + close(true); } } void Connection::onWriteOperation(const std::error_code &error) { - std::unique_lock lock(connectionLock); - writeTimer.cancel(); + std::error_code timerError; + writeTimer.cancel(timerError); + if (timerError && timerError != asio::error::operation_aborted) { + g_logger().warn("[Connection::onWriteOperation] - Failed to cancel write timer: {}", timerError.message()); + } if (error) { g_logger().error("[Connection::onWriteOperation] - Write error: {}", error.message()); - messageQueue.clear(); - close(FORCE_CLOSE); + OutputMessage_ptr outputMessage; + while (messageQueue.try_pop(outputMessage)) { + // Aqui, estamos apenas removendo todos os elementos da fila. + } + close(true); return; } - messageQueue.pop_front(); - - if (!messageQueue.empty()) { - const auto &outputMessage = messageQueue.front(); - lock.unlock(); - protocol->onSendMessage(outputMessage); - lock.lock(); - internalSend(outputMessage); - } else if (connectionState == CONNECTION_STATE_CLOSED) { + OutputMessage_ptr outputMessage; + if (messageQueue.try_pop(outputMessage)) { + if (protocol) { + protocol->onSendMessage(outputMessage); + internalSend(outputMessage); + } else { + g_logger().error("[Connection::onWriteOperation] - Protocol is null. Unable to send message."); + close(true); + } + } else if (connectionState.load() == CONNECTION_STATE_CLOSED) { closeSocket(); } } -void Connection::handleTimeout(ConnectionWeak_ptr connectionWeak, const std::error_code &error) { +void Connection::handleTimeout(const ConnectionWeak_ptr &connectionWeak, const std::error_code &error) { if (error == asio::error::operation_aborted) { return; } - if (auto connection = connectionWeak.lock()) { + if (const auto connection = connectionWeak.lock()) { if (!error) { - g_logger().debug("Connection Timeout, IP: {}", convertIPToString(connection->getIP())); + g_logger().debug("[Connection::handleTimeout] - Connection Timeout, IP: {}", convertIPToString(connection->getIP())); } else { - g_logger().debug("Connection Timeout or error: {}, IP: {}", error.message(), convertIPToString(connection->getIP())); + g_logger().debug("[Connection::handleTimeout] - Timeout or error: {}, IP: {}", error.message(), convertIPToString(connection->getIP())); } - connection->close(FORCE_CLOSE); + + connection->close(true); + } else { + g_logger().warn("[Connection::handleTimeout] - Connection no longer exists when handling timeout"); } } diff --git a/src/server/network/connection/connection.hpp b/src/server/network/connection/connection.hpp index 139bfc73024..b4c2623db50 100644 --- a/src/server/network/connection/connection.hpp +++ b/src/server/network/connection/connection.hpp @@ -9,6 +9,7 @@ #pragma once +#include #include "declarations.hpp" // TODO: Remove circular includes (maybe shared_ptr?) #include "server/network/message/networkmessage.hpp" @@ -36,7 +37,7 @@ class ConnectionManager { static ConnectionManager &getInstance(); - Connection_ptr createConnection(asio::io_service &io_service, const ConstServicePort_ptr &servicePort); + Connection_ptr createConnection(asio::io_context &io_service, const ConstServicePort_ptr &servicePort); void releaseConnection(const Connection_ptr &connection); void closeAll(); @@ -47,7 +48,7 @@ class ConnectionManager { class Connection : public std::enable_shared_from_this { public: // Constructor - Connection(asio::io_service &initIoService, ConstServicePort_ptr initservicePort); + Connection(asio::io_context &initIoService, ConstServicePort_ptr initservicePort); // Constructor end // Destructor @@ -75,7 +76,7 @@ class Connection : public std::enable_shared_from_this { void onWriteOperation(const std::error_code &error); - static void handleTimeout(ConnectionWeak_ptr connectionWeak, const std::error_code &error); + static void handleTimeout(const ConnectionWeak_ptr &connectionWeak, const std::error_code &error); void closeSocket(); void internalWorker(); @@ -85,12 +86,10 @@ class Connection : public std::enable_shared_from_this { return socket; } - asio::high_resolution_timer readTimer; - asio::high_resolution_timer writeTimer; + asio::steady_timer readTimer; + asio::steady_timer writeTimer; - std::recursive_mutex connectionLock; - - std::list messageQueue; + atomic_queue::AtomicQueue2 messageQueue; ConstServicePort_ptr service_port; Protocol_ptr protocol; @@ -101,9 +100,9 @@ class Connection : public std::enable_shared_from_this { std::time_t timeConnected = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); uint32_t packetsSent = 0; - uint32_t ip = 1; + std::atomic ip { 1 }; - std::underlying_type_t connectionState = CONNECTION_STATE_OPEN; + std::atomic connectionState = CONNECTION_STATE_OPEN; bool receivedFirst = false; friend class ServicePort; diff --git a/src/server/server.cpp b/src/server/server.cpp index 2ba8f36dfd0..e515aaf9496 100644 --- a/src/server/server.cpp +++ b/src/server/server.cpp @@ -44,9 +44,14 @@ void ServiceManager::stop() { running = false; - for (auto &servicePortIt : acceptors) { + for (auto &val : acceptors | std::views::values) { try { - io_service.post([servicePort = servicePortIt.second] { servicePort->onStopServer(); }); + auto weakServicePort = std::weak_ptr(val); + io_service.post([weakServicePort] { + if (const auto servicePort = weakServicePort.lock()) { + servicePort->onStopServer(); + } + }); } catch (const std::system_error &e) { g_logger().warn("[ServiceManager::stop] - Network error: {}", e.what()); } @@ -56,7 +61,9 @@ void ServiceManager::stop() { death_timer.expires_from_now(std::chrono::seconds(3)); death_timer.async_wait([this](const std::error_code &err) { - die(); + if (!err) { + die(); + } }); } @@ -88,7 +95,11 @@ void ServicePort::accept() { } auto connection = ConnectionManager::getInstance().createConnection(io_service, shared_from_this()); - acceptor->async_accept(connection->getSocket(), [self = shared_from_this(), connection](const std::error_code &error) { self->onAccept(connection, error); }); + acceptor->async_accept(connection->getSocket(), [weakSelf = std::weak_ptr(shared_from_this()), connection](const std::error_code &error) { + if (const auto self = weakSelf.lock()) { + self->onAccept(connection, error); + } + }); } void ServicePort::onAccept(const Connection_ptr &connection, const std::error_code &error) { @@ -114,8 +125,15 @@ void ServicePort::onAccept(const Connection_ptr &connection, const std::error_co if (!pendingStart) { close(); pendingStart = true; + + auto weakSelf = std::weak_ptr(shared_from_this()); g_dispatcher().scheduleEvent( - 15000, [self = shared_from_this(), serverPort = serverPort] { ServicePort::openAcceptor(std::weak_ptr(self), serverPort); }, "ServicePort::openAcceptor" + 15000, [weakSelf, serverPort = serverPort] { + if (auto self = weakSelf.lock()) { + openAcceptor(weakSelf, serverPort); + } + }, + "ServicePort::openAcceptor" ); } } @@ -123,12 +141,8 @@ void ServicePort::onAccept(const Connection_ptr &connection, const std::error_co Protocol_ptr ServicePort::make_protocol(bool checksummed, NetworkMessage &msg, const Connection_ptr &connection) const { const uint8_t protocolID = msg.getByte(); - for (auto &service : services) { - if (protocolID != service->get_protocol_identifier()) { - continue; - } - - if ((checksummed && service->is_checksummed()) || !service->is_checksummed()) { + for (const auto &service : services) { + if (protocolID == service->get_protocol_identifier() && ((checksummed && service->is_checksummed()) || !service->is_checksummed())) { return service->make_protocol(connection); } } @@ -152,22 +166,27 @@ void ServicePort::open(uint16_t port) { pendingStart = false; try { - if (g_configManager().getBoolean(BIND_ONLY_GLOBAL_ADDRESS)) { - acceptor = std::make_unique(io_service, asio::ip::tcp::endpoint(asio::ip::address(asio::ip::address_v4::from_string(g_configManager().getString(IP))), serverPort)); - } else { - acceptor = std::make_unique(io_service, asio::ip::tcp::endpoint(asio::ip::address(asio::ip::address_v4(INADDR_ANY)), serverPort)); - } + auto endpoint = g_configManager().getBoolean(BIND_ONLY_GLOBAL_ADDRESS) + ? asio::ip::tcp::endpoint(asio::ip::address::from_string(g_configManager().getString(IP)), serverPort) + : asio::ip::tcp::endpoint(asio::ip::tcp::v4(), serverPort); + acceptor = std::make_unique(io_service, endpoint); acceptor->set_option(asio::ip::tcp::no_delay(true)); accept(); } catch (const std::system_error &e) { - g_logger().warn("[ServicePort::open] - Error code: {}", e.what()); + g_logger().warn("[ServicePort::open] - Failed to open acceptor, error: {}", e.what()); pendingStart = true; + auto weakSelf = std::weak_ptr(shared_from_this()); g_dispatcher().scheduleEvent( 15000, - [self = shared_from_this(), port] { ServicePort::openAcceptor(std::weak_ptr(self), port); }, "ServicePort::openAcceptor" + [weakSelf, port] { + if (auto self = weakSelf.lock()) { + openAcceptor(weakSelf, port); + } + }, + "ServicePort::openAcceptor" ); } } @@ -176,6 +195,9 @@ void ServicePort::close() const { if (acceptor && acceptor->is_open()) { std::error_code error; acceptor->close(error); + if (error) { + g_logger().warn("[ServicePort::close] - Failed to close acceptor: {}", error.message()); + } } } diff --git a/src/server/server.hpp b/src/server/server.hpp index 0a658f4d26f..e4333a4a712 100644 --- a/src/server/server.hpp +++ b/src/server/server.hpp @@ -49,7 +49,7 @@ class Service final : public ServiceBase { class ServicePort : public std::enable_shared_from_this { public: - explicit ServicePort(asio::io_service &init_io_service) : + explicit ServicePort(asio::io_context &init_io_service) : io_service(init_io_service) { } ~ServicePort(); @@ -72,7 +72,7 @@ class ServicePort : public std::enable_shared_from_this { private: void accept(); - asio::io_service &io_service; + asio::io_context &io_service; std::unique_ptr acceptor; std::vector services; @@ -104,9 +104,9 @@ class ServiceManager { phmap::flat_hash_map acceptors; - asio::io_service io_service; + asio::io_context io_service; Signals signals { io_service }; - asio::high_resolution_timer death_timer { io_service }; + asio::steady_timer death_timer { io_service }; bool running = false; }; @@ -119,16 +119,16 @@ bool ServiceManager::add(uint16_t port) { return false; } - ServicePort_ptr service_port; + auto [iter, inserted] = acceptors.emplace(port, nullptr); - const auto foundServicePort = acceptors.find(port); + ServicePort_ptr service_port; - if (foundServicePort == acceptors.end()) { + if (inserted) { service_port = std::make_shared(io_service); service_port->open(port); - acceptors[port] = service_port; + iter->second = service_port; } else { - service_port = foundServicePort->second; + service_port = iter->second; if (service_port->is_single_socket() || ProtocolType::SERVER_SENDS_FIRST) { g_logger().error("[ServiceManager::add] - " @@ -138,5 +138,11 @@ bool ServiceManager::add(uint16_t port) { } } - return service_port->add_service(std::make_shared>()); + auto service = std::make_shared>(); + if (!service_port->add_service(service)) { + g_logger().warn("[ServiceManager::add] - Failed to add service {} to port {}", ProtocolType::protocol_name(), port); + return false; + } + + return true; } diff --git a/src/server/signals.cpp b/src/server/signals.cpp index 1e6e0e4ece7..dfa1225683a 100644 --- a/src/server/signals.cpp +++ b/src/server/signals.cpp @@ -21,7 +21,7 @@ #include "lua/scripts/lua_environment.hpp" #include "lib/di/container.hpp" -Signals::Signals(asio::io_service &service) : +Signals::Signals(asio::io_context &service) : set(service) { set.add(SIGINT); set.add(SIGTERM); diff --git a/src/server/signals.hpp b/src/server/signals.hpp index 4d46e24e129..9cea2d63f86 100644 --- a/src/server/signals.hpp +++ b/src/server/signals.hpp @@ -13,7 +13,7 @@ class Signals { asio::signal_set set; public: - explicit Signals(asio::io_service &service); + explicit Signals(asio::io_context &service); private: void asyncWait();