Skip to content

Commit

Permalink
[core] Split ZMQ service
Browse files Browse the repository at this point in the history
  • Loading branch information
zach2good committed Apr 23, 2022
1 parent e3501c1 commit aad95de
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 97 deletions.
1 change: 0 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ include(Platform)
include(StandardProjectSettings)
include(CompilerWarnings)
include(Sanitizers)
include(Tracy)
include(Valgrind)

if(WIN32)
Expand Down
44 changes: 0 additions & 44 deletions cmake/Tracy.cmake

This file was deleted.

61 changes: 37 additions & 24 deletions ext/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,41 @@ CPMAddPackage(
GIT_TAG v1.0.3
)

# Enable Tracy on command-line with 'cmake -DTRACY_ENABLE=ON ..'
option(TRACY_ENABLE "Enable Tracy profiling." OFF)
message(STATUS "TRACY_ENABLE: ${TRACY_ENABLE}")

if(TRACY_ENABLE)
CPMAddPackage(
NAME tracy
GITHUB_REPOSITORY wolfpld/tracy
VERSION 0.7.8
)

if (tracy_ADDED)
add_library(tracy_client STATIC ${tracy_SOURCE_DIR}/TracyClient.cpp)
target_include_directories(tracy_client
PUBLIC
$<BUILD_INTERFACE:${tracy_SOURCE_DIR}>
)
target_compile_definitions(tracy_client PUBLIC TRACY_ENABLE TRACY_ON_DEMAND TRACY_NO_EXIT TRACY_NO_BROADCAST)
target_link_libraries(tracy_client PRIVATE no_warnings)

if(MSVC AND CMAKE_SIZEOF_VOID_P EQUAL 4)
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /LARGEADDRESSAWARE")
endif()
if(TRACY_ENABLE OR ENABLE_TRACY OR TRACY_ENABLED OR TRACY)
message(STATUS "TRACY_ENABLE: ON")

# Download dev libs
set(TRACY_LINK https://github.com/wolfpld/tracy/archive/v0.8.1.tar.gz)
if(NOT EXISTS ${CMAKE_SOURCE_DIR}/ext/tracy/tracy-0.8.1/TracyClient.cpp)
message(STATUS "Downloading Tracy development library")
file(MAKE_DIRECTORY ${CMAKE_SOURCE_DIR}/tracy)
file(DOWNLOAD
${TRACY_LINK}
${CMAKE_SOURCE_DIR}/ext/tracy/tracy.tar.gz
TIMEOUT 60)
execute_process(COMMAND "${CMAKE_COMMAND}" -E
tar xvf "${CMAKE_SOURCE_DIR}/ext/tracy/tracy.tar.gz"
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/ext/tracy)

# Download client
message(STATUS "Downloading Tracy client")
file(DOWNLOAD
https://github.com/wolfpld/tracy/releases/download/v0.8.1/Tracy-0.8.1.7z
${CMAKE_SOURCE_DIR}/tracy.tar.gz
TIMEOUT 60)
execute_process(COMMAND "${CMAKE_COMMAND}" -E
tar xvf "${CMAKE_SOURCE_DIR}/tracy.tar.gz"
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/)
endif()

add_library(tracy_client ${CMAKE_SOURCE_DIR}/ext/tracy/tracy-0.8.1/TracyClient.cpp)
target_include_directories(tracy_client PUBLIC ${CMAKE_SOURCE_DIR}/ext/tracy/tracy-0.8.1/)
target_compile_definitions(tracy_client PUBLIC
TRACY_ENABLE TRACY_ON_DEMAND TRACY_NO_EXIT TRACY_NO_BROADCAST TRACY_TIMER_QPC TRACY_TIMER_FALLBACK)

if(MSVC AND CMAKE_SIZEOF_VOID_P EQUAL 4)
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /LARGEADDRESSAWARE")
endif()
endif(TRACY_ENABLE)
else()
message(STATUS "TRACY_ENABLE: OFF")
endif()
70 changes: 48 additions & 22 deletions src/common/zmq_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,43 @@ along with this program. If not, see http://www.gnu.org/licenses/
#include "socket.h"

ZMQService::ZMQService(zmq::socket_type type)
{
running = true;
zmqThread = std::thread(&ZMQService::_init, this, type);
}

ZMQService::~ZMQService()
{
running = false;
if (zmqThread.joinable())
{
zmqThread.join();
}
}

void ZMQService::send(uint64 ipp, MSGSERVTYPE type, zmq::message_t* extra, zmq::message_t* packet)
{
chat_message_t msg;
msg.dest = ipp;
msg.type = type;
msg.data.copy(*extra);
msg.packet.copy(*packet);
outgoing_queue.enqueue(std::move(msg));
}

auto ZMQService::recv() -> std::vector<chat_message_t>
{ std::vector<chat_message_t> vec(32);
incoming_queue.try_dequeue_bulk(vec.begin(), 32);
return vec;
}

void ZMQService::_init(zmq::socket_type type)
{
zmqSql = std::make_unique<SqlConnection>(SettingsManager::Get<std::string>(SqlSettings::LOGIN).c_str(),
SettingsManager::Get<std::string>(SqlSettings::PASSWORD).c_str(),
SettingsManager::Get<std::string>(SqlSettings::HOST).c_str(),
SettingsManager::Get<unsigned int>(SqlSettings::PORT),
SettingsManager::Get<std::string>(SqlSettings::DATABASE).c_str());
SettingsManager::Get<std::string>(SqlSettings::PASSWORD).c_str(),
SettingsManager::Get<std::string>(SqlSettings::HOST).c_str(),
SettingsManager::Get<unsigned int>(SqlSettings::PORT),
SettingsManager::Get<std::string>(SqlSettings::DATABASE).c_str());

pContext = std::make_unique<zmq::context_t>(1);
pSocket = std::make_unique<zmq::socket_t>(*pContext, type);
Expand All @@ -55,10 +86,10 @@ ZMQService::ZMQService(zmq::socket_type type)
std::exit(-1);
}

listen();
_listen();
}

ZMQService::~ZMQService()
void ZMQService::_destroy()
{
if (pSocket)
{
Expand All @@ -75,17 +106,7 @@ ZMQService::~ZMQService()
}
}

void ZMQService::queue(uint64 ipp, MSGSERVTYPE type, zmq::message_t* extra, zmq::message_t* packet)
{
chat_message_t msg;
msg.dest = ipp;
msg.type = type;
msg.data.copy(*extra);
msg.packet.copy(*packet);
outgoing_queue.enqueue(std::move(msg));
}

void ZMQService::send(uint64 ipp, MSGSERVTYPE type, zmq::message_t* extra, zmq::message_t* packet)
void ZMQService::_send(uint64 ipp, MSGSERVTYPE type, zmq::message_t* extra, zmq::message_t* packet)
{
try
{
Expand All @@ -102,7 +123,7 @@ void ZMQService::send(uint64 ipp, MSGSERVTYPE type, zmq::message_t* extra, zmq::
}
}

void ZMQService::parse(MSGSERVTYPE type, zmq::message_t* extra, zmq::message_t* packet, zmq::message_t* from)
void ZMQService::_parse(MSGSERVTYPE type, zmq::message_t* extra, zmq::message_t* packet, zmq::message_t* from)
{
int ret = SQL_ERROR;
in_addr from_ip;
Expand Down Expand Up @@ -236,23 +257,26 @@ void ZMQService::parse(MSGSERVTYPE type, zmq::message_t* extra, zmq::message_t*
}
}

void ZMQService::listen()
void ZMQService::_listen()
{
while (true)
while (running)
{
std::array<zmq::message_t, 4> msgs;
try
{
chat_message_t msg;

const auto ret = zmq::recv_multipart_n(*pSocket, msgs.data(), msgs.size());
if (!ret)
{
chat_message_t msg;
while (outgoing_queue.try_dequeue(msg))
{
send(msg.dest, msg.type, &msg.data, &msg.packet);
}
continue;
}

incoming_queue.enqueue(std::move(msg));
}
catch (zmq::error_t& e)
{
Expand All @@ -271,8 +295,10 @@ void ZMQService::listen()
// 1: zmq::message_t type;
// 2: zmq::message_t extra;
// 3: zmq::message_t packet;
parse((MSGSERVTYPE)ref<uint8>((uint8*)msgs[1].data(), 0), &msgs[2], &msgs[3], &msgs[0]);
_parse((MSGSERVTYPE)ref<uint8>((uint8*)msgs[1].data(), 0), &msgs[2], &msgs[3], &msgs[0]);

zmqSql->TryPing();
}

_destroy();
}
18 changes: 15 additions & 3 deletions src/common/zmq_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <concurrentqueue.h>

#include "logging_service.h"
Expand All @@ -47,15 +48,26 @@ class ZMQService
ZMQService(zmq::socket_type type);
~ZMQService();

void queue(uint64 ipp, MSGSERVTYPE type, zmq::message_t* extra, zmq::message_t* packet);
void send(uint64 ipp, MSGSERVTYPE type, zmq::message_t* extra, zmq::message_t* packet);
void parse(MSGSERVTYPE type, zmq::message_t* extra, zmq::message_t* packet, zmq::message_t* from);
void listen();
auto recv() -> std::vector<chat_message_t>;

private:
// For use by the internal thread only
void _init(zmq::socket_type type);
void _destroy();
void _send(uint64 ipp, MSGSERVTYPE type, zmq::message_t* extra, zmq::message_t* packet);
void _parse(MSGSERVTYPE type, zmq::message_t* extra, zmq::message_t* packet, zmq::message_t* from);
void _listen();

std::thread zmqThread;
std::atomic<bool> running;

std::unique_ptr<zmq::context_t> pContext;
std::unique_ptr<zmq::socket_t> pSocket;
std::string address;

std::unique_ptr<SqlConnection> zmqSql;

moodycamel::ConcurrentQueue<chat_message_t> incoming_queue;
moodycamel::ConcurrentQueue<chat_message_t> outgoing_queue;
};
2 changes: 1 addition & 1 deletion src/map/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ add_executable(xi_zone
${SOURCES}
${resource})

target_include_directories(topaz_game PUBLIC ${module_include_dirs})
target_include_directories(xi_zone PUBLIC ${module_include_dirs})

if(WIN32)
set_target_properties(xi_zone PROPERTIES OUTPUT_NAME xi_zone)
Expand Down
4 changes: 2 additions & 2 deletions src/map/status_effect_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,7 @@ uint8 CStatusEffectContainer::GetActiveRuneCount()

EFFECT CStatusEffectContainer::GetHighestRuneEffect()
{
std::unordered_map<EFFECT,uint8> runeEffects;
std::unordered_map<EFFECT, uint8> runeEffects;

for (CStatusEffect* PStatusEffect : m_StatusEffectSet)
{
Expand All @@ -1162,7 +1162,7 @@ EFFECT CStatusEffectContainer::GetHighestRuneEffect()
}

EFFECT highestRune = EFFECT_NONE;
int highestRuneValue;
uint8 highestRuneValue = 0;

for (auto iter = runeEffects.begin(); iter != runeEffects.end(); ++iter)
{
Expand Down

0 comments on commit aad95de

Please sign in to comment.