Skip to content

Commit

Permalink
Merge pull request #40 from anutosh491/heartbeat_client
Browse files Browse the repository at this point in the history
Introducing a heartbeat client
  • Loading branch information
JohanMabille authored Apr 5, 2024
2 parents ed541c0 + 3bb4e84 commit 42a6f73
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 11 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ set(XEUS_ZMQ_SOURCES
${XEUS_ZMQ_SOURCE_DIR}/xdealer_channel.cpp
${XEUS_ZMQ_SOURCE_DIR}/xiopub_client.hpp
${XEUS_ZMQ_SOURCE_DIR}/xiopub_client.cpp
${XEUS_ZMQ_SOURCE_DIR}/xheartbeat_client.hpp
${XEUS_ZMQ_SOURCE_DIR}/xheartbeat_client.cpp
)

# Targets and link
Expand Down
3 changes: 3 additions & 0 deletions include/xeus-zmq/xclient_zmq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace xeus
public:

using listener = std::function<void(xmessage)>;
using kernel_status_listener = std::function<void(bool)>;

explicit xclient_zmq(std::unique_ptr<xclient_zmq_impl> impl);
~xclient_zmq();
Expand All @@ -42,10 +43,12 @@ namespace xeus
void register_shell_listener(const listener& l);
void register_control_listener(const listener& l);
void register_iopub_listener(const listener& l);
void register_kernel_status_listener(const kernel_status_listener& l);

void notify_shell_listener(xmessage msg);
void notify_control_listener(xmessage msg);
void notify_iopub_listener(xmessage msg);
void notify_kernel_dead(bool status);

std::size_t iopub_queue_size() const;
std::optional<xmessage> pop_iopub_message();
Expand Down
10 changes: 10 additions & 0 deletions src/xclient_zmq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ namespace xeus
p_client_impl->register_iopub_listener(l);
}

void xclient_zmq::register_kernel_status_listener(const kernel_status_listener& l)
{
p_client_impl->register_kernel_status_listener(l);
}

void xclient_zmq::notify_shell_listener(xmessage msg)
{
p_client_impl->notify_shell_listener(std::move(msg));
Expand All @@ -73,6 +78,11 @@ namespace xeus
p_client_impl->notify_iopub_listener(std::move(msg));
}

void xclient_zmq::notify_kernel_dead(bool status)
{
p_client_impl->notify_kernel_dead(status);
}

std::size_t xclient_zmq::iopub_queue_size() const
{
return p_client_impl->iopub_queue_size();
Expand Down
18 changes: 17 additions & 1 deletion src/xclient_zmq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace xeus
, m_shell_client(context, config.m_transport, config.m_ip, config.m_shell_port)
, m_control_client(context, config.m_transport, config.m_ip, config.m_control_port)
, m_iopub_client(context, config)
, m_heartbeat_client(context, config, m_max_retry, m_heartbeat_timeout)
, p_messenger(context)
, m_error_handler(eh)
{
Expand Down Expand Up @@ -93,6 +94,11 @@ namespace xeus
m_iopub_listener = l;
}

void xclient_zmq_impl::register_kernel_status_listener(const kernel_status_listener& l)
{
m_heartbeat_client.register_kernel_status_listener(l);
}

void xclient_zmq_impl::connect()
{
p_messenger.connect();
Expand All @@ -118,6 +124,11 @@ namespace xeus
m_iopub_listener(std::move(msg));
}

void xclient_zmq_impl::notify_kernel_dead(bool status)
{
m_heartbeat_client.notify_kernel_dead(status);
}

void xclient_zmq_impl::poll(long timeout)
{
zmq::multipart_t wire_msg;
Expand Down Expand Up @@ -166,14 +177,19 @@ namespace xeus
void xclient_zmq_impl::start()
{
start_iopub_thread();
// TODO : Introduce a client, xheartbeat_client that runs on its own thread, m_heartbeat_thread.
start_heartbeat_thread();
}

void xclient_zmq_impl::start_iopub_thread()
{
m_iopub_thread = std::move(xthread(&xiopub_client::run, p_iopub_client.get()));
}

void xclient_zmq_impl::start_heartbeat_thread()
{
m_heartbeat_thread = std::move(xthread(&xheartbeat_client::run, p_heartbeat_client.get()));
}

xmessage xclient_zmq_impl::deserialize(zmq::multipart_t& wire_msg) const
{
return xzmq_serializer::deserialize(wire_msg, *p_auth);
Expand Down
15 changes: 13 additions & 2 deletions src/xclient_zmq_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "xdealer_channel.hpp"
#include "xiopub_client.hpp"
#include "xheartbeat_client.hpp"
#include "xclient_messenger.hpp"

namespace xeus
Expand All @@ -31,7 +32,9 @@ namespace xeus
{
public:
using iopub_client_ptr = std::unique_ptr<xiopub_client>;
using heartbeat_client_ptr = std::unique_ptr<xheartbeat_client>;
using listener = std::function<void(xmessage)>;
using kernel_status_listener = std::function<void(bool)>;

xclient_zmq_impl(zmq::context_t& context,
const xconfiguration& config,
Expand Down Expand Up @@ -60,8 +63,8 @@ namespace xeus
std::optional<xmessage> pop_iopub_message();
void register_iopub_listener(const listener& l);

// hearbeat channel
// TODO
// heartbeat channel
void register_kernel_status_listener(const kernel_status_listener& l);

// client messenger
void connect();
Expand All @@ -70,6 +73,7 @@ namespace xeus
void notify_shell_listener(xmessage msg);
void notify_control_listener(xmessage msg);
void notify_iopub_listener(xmessage msg);
void notify_kernel_dead(bool status);

void wait_for_message();
void start();
Expand All @@ -78,6 +82,7 @@ namespace xeus

private:
void start_iopub_thread();
void start_heartbeat_thread();
void poll(long timeout);

using authentication_ptr = std::unique_ptr<xauthentication>;
Expand All @@ -86,6 +91,10 @@ namespace xeus
xdealer_channel m_shell_client;
xdealer_channel m_control_client;
xiopub_client m_iopub_client;
xheartbeat_client m_heartbeat_client;

const std::size_t m_max_retry = 3;
const long m_heartbeat_timeout = std::chrono::milliseconds(90).count();

xclient_messenger p_messenger;

Expand All @@ -96,8 +105,10 @@ namespace xeus
listener m_iopub_listener;

iopub_client_ptr p_iopub_client;
heartbeat_client_ptr p_heartbeat_client;

xthread m_iopub_thread;
xthread m_heartbeat_thread;
};
}

Expand Down
3 changes: 0 additions & 3 deletions src/xdealer_channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
#include "zmq.hpp"
#include "zmq_addon.hpp"

#include "nlohmann/json.hpp"
#include "xeus/xkernel_configuration.hpp"

namespace xeus
{

Expand Down
81 changes: 81 additions & 0 deletions src/xheartbeat_client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/***************************************************************************
* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou *
* Copyright (c) 2016, QuantStack *
* *
* Distributed under the terms of the BSD 3-Clause License. *
* *
* The full license is in the file LICENSE, distributed with this software. *
****************************************************************************/

#include "xheartbeat_client.hpp"
#include "xclient_zmq_impl.hpp"

#include "xeus-zmq/xmiddleware.hpp"

namespace xeus
{

xheartbeat_client::xheartbeat_client(zmq::context_t& context,
const xeus::xconfiguration& config,
const std::size_t max_retry,
const long timeout)
: m_heartbeat(context, zmq::socket_type::req)
, m_controller(context, zmq::socket_type::rep)
, m_max_retry(max_retry)
, m_heartbeat_timeout(timeout)
{
m_heartbeat.connect(get_end_point(config.m_transport, config.m_ip, config.m_hb_port));
init_socket(m_controller, get_controller_end_point("heartbeat"));
}

xheartbeat_client::~xheartbeat_client()
{
}

void xheartbeat_client::send_heartbeat_message()
{
zmq::message_t ping_msg("ping", 4);
m_heartbeat.send(ping_msg, zmq::send_flags::none);
}

bool xheartbeat_client::wait_for_answer(long timeout)
{
m_heartbeat.set(zmq::sockopt::linger, static_cast<int>(timeout));
zmq::message_t response;
return m_heartbeat.recv(response).has_value();
}

void xheartbeat_client::register_kernel_status_listener(const kernel_status_listener& l)
{
m_kernel_status_listener = l;
}

void xheartbeat_client::notify_kernel_dead(bool status)
{
m_kernel_status_listener(status);
}

void xheartbeat_client::run()
{
bool stop = false;
std::size_t retry_count = 0;

while(!stop)
{
send_heartbeat_message();
if(!wait_for_answer(m_heartbeat_timeout))
{
if (retry_count < m_max_retry)
{
++retry_count;
} else {
notify_kernel_dead(true);
stop = true;
}
} else {
retry_count = 0;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
}
52 changes: 52 additions & 0 deletions src/xheartbeat_client.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/***************************************************************************
* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou *
* Copyright (c) 2016, QuantStack *
* *
* Distributed under the terms of the BSD 3-Clause License. *
* *
* The full license is in the file LICENSE, distributed with this software. *
****************************************************************************/

#ifndef XEUS_HEARTBEAT_CLIENT_HPP
#define XEUS_HEARTBEAT_CLIENT_HPP

#include <functional>

#include "zmq.hpp"

#include "xeus/xkernel_configuration.hpp"

namespace xeus
{
class xheartbeat_client
{
public:

using kernel_status_listener = std::function<void(bool)>;

xheartbeat_client(zmq::context_t& context,
const xeus::xconfiguration& config,
const std::size_t max_retry,
const long timeout);

~xheartbeat_client();

void run();

void register_kernel_status_listener(const kernel_status_listener& l);
void notify_kernel_dead(bool status);

private:
void send_heartbeat_message();
bool wait_for_answer(long timeout);

zmq::socket_t m_heartbeat;
zmq::socket_t m_controller;

kernel_status_listener m_kernel_status_listener;
const std::size_t m_max_retry;
const long m_heartbeat_timeout;
};
}

#endif
5 changes: 0 additions & 5 deletions src/xiopub_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,10 @@
#include <mutex>

#include "zmq.hpp"
#include "nlohmann/json.hpp"

#include "xeus/xmessage.hpp"
#include "xeus/xeus_context.hpp"
#include "xeus/xkernel_configuration.hpp"

#include "xeus-zmq/xthread.hpp"
#include "xeus-zmq/xmiddleware.hpp"

namespace xeus
{
class xclient_zmq_impl;
Expand Down

0 comments on commit 42a6f73

Please sign in to comment.