Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing a heartbeat client #40

Merged
merged 11 commits into from
Apr 5, 2024
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ set(XEUS_ZMQ_SOURCES
${XEUS_ZMQ_SOURCE_DIR}/xdealer_channel.cpp
${XEUS_ZMQ_SOURCE_DIR}/xiopub_client.hpp
${XEUS_ZMQ_SOURCE_DIR}/xiopub_client.cpp
${XEUS_ZMQ_SOURCE_DIR}/xheartbeat_client.hpp
${XEUS_ZMQ_SOURCE_DIR}/xheartbeat_client.cpp
)

# Targets and link
Expand Down
3 changes: 3 additions & 0 deletions include/xeus-zmq/xclient_zmq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace xeus
public:

using listener = std::function<void(xmessage)>;
using kernel_status_listener = std::function<void(bool)>;

explicit xclient_zmq(std::unique_ptr<xclient_zmq_impl> impl);
~xclient_zmq();
Expand All @@ -42,10 +43,12 @@ namespace xeus
void register_shell_listener(const listener& l);
void register_control_listener(const listener& l);
void register_iopub_listener(const listener& l);
void register_kernel_status_listener(const kernel_status_listener& l);

void notify_shell_listener(xmessage msg);
void notify_control_listener(xmessage msg);
void notify_iopub_listener(xmessage msg);
void notify_kernel_dead(bool status);

std::size_t iopub_queue_size() const;
std::optional<xmessage> pop_iopub_message();
Expand Down
10 changes: 10 additions & 0 deletions src/xclient_zmq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ namespace xeus
p_client_impl->register_iopub_listener(l);
}

void xclient_zmq::register_kernel_status_listener(const kernel_status_listener& l)
{
p_client_impl->register_kernel_status_listener(l);
}

void xclient_zmq::notify_shell_listener(xmessage msg)
{
p_client_impl->notify_shell_listener(std::move(msg));
Expand All @@ -73,6 +78,11 @@ namespace xeus
p_client_impl->notify_iopub_listener(std::move(msg));
}

void xclient_zmq::notify_kernel_dead(bool status)
{
p_client_impl->notify_kernel_dead(status);
}

std::size_t xclient_zmq::iopub_queue_size() const
{
return p_client_impl->iopub_queue_size();
Expand Down
19 changes: 18 additions & 1 deletion src/xclient_zmq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace xeus
, m_shell_client(context, config.m_transport, config.m_ip, config.m_shell_port)
, m_control_client(context, config.m_transport, config.m_ip, config.m_control_port)
, m_iopub_client(context, config)
, m_heartbeat_client(context, config)
, p_messenger(context)
, m_error_handler(eh)
{
Expand Down Expand Up @@ -93,6 +94,11 @@ namespace xeus
m_iopub_listener = l;
}

void xclient_zmq_impl::register_kernel_status_listener(const kernel_status_listener& l)
{
m_kernel_status_listener = l;
}

void xclient_zmq_impl::connect()
{
p_messenger.connect();
Expand All @@ -118,6 +124,11 @@ namespace xeus
m_iopub_listener(std::move(msg));
}

void xclient_zmq_impl::notify_kernel_dead(bool status)
{
m_kernel_status_listener(status);
}

void xclient_zmq_impl::poll(long timeout)
{
zmq::multipart_t wire_msg;
Expand Down Expand Up @@ -166,14 +177,20 @@ namespace xeus
void xclient_zmq_impl::start()
{
start_iopub_thread();
// TODO : Introduce a client, xheartbeat_client that runs on its own thread, m_heartbeat_thread.
// pass in desired timeout value below
start_heartbeat_thread();
anutosh491 marked this conversation as resolved.
Show resolved Hide resolved
}

void xclient_zmq_impl::start_iopub_thread()
{
m_iopub_thread = std::move(xthread(&xiopub_client::run, p_iopub_client.get()));
}

void xclient_zmq_impl::start_heartbeat_thread(long timeout)
{
m_heartbeat_thread = std::move(xthread(&xheartbeat_client::run, p_heartbeat_client.get(), timeout));
}

xmessage xclient_zmq_impl::deserialize(zmq::multipart_t& wire_msg) const
{
return xzmq_serializer::deserialize(wire_msg, *p_auth);
Expand Down
13 changes: 11 additions & 2 deletions src/xclient_zmq_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "xdealer_channel.hpp"
#include "xiopub_client.hpp"
#include "xheartbeat_client.hpp"
#include "xclient_messenger.hpp"

namespace xeus
Expand All @@ -31,7 +32,9 @@ namespace xeus
{
public:
using iopub_client_ptr = std::unique_ptr<xiopub_client>;
using heartbeat_client_ptr = std::unique_ptr<xheartbeat_client>;
using listener = std::function<void(xmessage)>;
using kernel_status_listener = std::function<void(bool)>;

xclient_zmq_impl(zmq::context_t& context,
const xconfiguration& config,
Expand Down Expand Up @@ -60,8 +63,8 @@ namespace xeus
std::optional<xmessage> pop_iopub_message();
void register_iopub_listener(const listener& l);

// hearbeat channel
// TODO
// heartbeat channel
void register_kernel_status_listener(const kernel_status_listener& l);

// client messenger
void connect();
Expand All @@ -70,6 +73,7 @@ namespace xeus
void notify_shell_listener(xmessage msg);
void notify_control_listener(xmessage msg);
void notify_iopub_listener(xmessage msg);
void notify_kernel_dead(bool status);

void wait_for_message();
void start();
Expand All @@ -78,6 +82,7 @@ namespace xeus

private:
void start_iopub_thread();
void start_heartbeat_thread(long timeout);
void poll(long timeout);

using authentication_ptr = std::unique_ptr<xauthentication>;
Expand All @@ -86,6 +91,7 @@ namespace xeus
xdealer_channel m_shell_client;
xdealer_channel m_control_client;
xiopub_client m_iopub_client;
xheartbeat_client m_heartbeat_client;

xclient_messenger p_messenger;

Expand All @@ -94,10 +100,13 @@ namespace xeus
listener m_shell_listener;
listener m_control_listener;
listener m_iopub_listener;
kernel_status_listener m_kernel_status_listener;

iopub_client_ptr p_iopub_client;
heartbeat_client_ptr p_heartbeat_client;

xthread m_iopub_thread;
xthread m_heartbeat_thread;
};
}

Expand Down
68 changes: 68 additions & 0 deletions src/xheartbeat_client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/***************************************************************************
* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou *
* Copyright (c) 2016, QuantStack *
* *
* Distributed under the terms of the BSD 3-Clause License. *
* *
* The full license is in the file LICENSE, distributed with this software. *
****************************************************************************/

#include "xheartbeat_client.hpp"
#include "xclient_zmq_impl.hpp"

#include "xeus-zmq/xmiddleware.hpp"

namespace xeus
{

xheartbeat_client::xheartbeat_client(zmq::context_t& context,
const xeus::xconfiguration& config)
: m_heartbeat(context, zmq::socket_type::req)
, m_controller(context, zmq::socket_type::rep)
{
m_heartbeat.connect(get_end_point(config.m_transport, config.m_ip, config.m_hb_port));
init_socket(m_controller, get_controller_end_point("heartbeat"));
}

xheartbeat_client::~xheartbeat_client()
{
}

void xheartbeat_client::send_heartbeat_message()
{
zmq::message_t ping_msg("ping", 4);
m_heartbeat.send(ping_msg, zmq::send_flags::none);
}

bool xheartbeat_client::wait_for_answer(long timeout)
{
m_heartbeat.set(zmq::sockopt::linger, static_cast<int>(timeout));
zmq::message_t response;
return m_heartbeat.recv(response).has_value();
}

void xheartbeat_client::run(long timeout)
{
bool stop = false;
int retry_count = 0;
const int max_retries = 3;

while(!stop)
{
send_heartbeat_message();
if(!wait_for_answer(timeout))
{
if (retry_count < max_retries)
{
++retry_count;
} else {
p_client_impl->notify_kernel_dead(true);
anutosh491 marked this conversation as resolved.
Show resolved Hide resolved
stop = true;
}
} else {
retry_count = 0;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
}
43 changes: 43 additions & 0 deletions src/xheartbeat_client.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/***************************************************************************
* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou *
* Copyright (c) 2016, QuantStack *
* *
* Distributed under the terms of the BSD 3-Clause License. *
* *
* The full license is in the file LICENSE, distributed with this software. *
****************************************************************************/

#ifndef XEUS_HEARTBEAT_CLIENT_HPP
#define XEUS_HEARTBEAT_CLIENT_HPP

#include "zmq.hpp"

#include "xeus/xkernel_configuration.hpp"

namespace xeus
{
class xclient_zmq_impl;

class xheartbeat_client
{
public:

xheartbeat_client(zmq::context_t& context,
const xeus::xconfiguration& config);

~xheartbeat_client();

void run(long timeout);

private:
void send_heartbeat_message();
bool wait_for_answer(long timeout);

zmq::socket_t m_heartbeat;
zmq::socket_t m_controller;

xclient_zmq_impl* p_client_impl;
};
}

#endif
5 changes: 0 additions & 5 deletions src/xiopub_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,10 @@
#include <mutex>

#include "zmq.hpp"
#include "nlohmann/json.hpp"

#include "xeus/xmessage.hpp"
#include "xeus/xeus_context.hpp"
#include "xeus/xkernel_configuration.hpp"

#include "xeus-zmq/xthread.hpp"
#include "xeus-zmq/xmiddleware.hpp"

namespace xeus
{
class xclient_zmq_impl;
Expand Down
Loading