diff --git a/CMakeLists.txt b/CMakeLists.txt index be7c81b..688d213 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -174,6 +174,8 @@ set(XEUS_ZMQ_SOURCES ${XEUS_ZMQ_SOURCE_DIR}/xdealer_channel.cpp ${XEUS_ZMQ_SOURCE_DIR}/xiopub_client.hpp ${XEUS_ZMQ_SOURCE_DIR}/xiopub_client.cpp + ${XEUS_ZMQ_SOURCE_DIR}/xheartbeat_client.hpp + ${XEUS_ZMQ_SOURCE_DIR}/xheartbeat_client.cpp ) # Targets and link diff --git a/include/xeus-zmq/xclient_zmq.hpp b/include/xeus-zmq/xclient_zmq.hpp index 9eed125..2194938 100644 --- a/include/xeus-zmq/xclient_zmq.hpp +++ b/include/xeus-zmq/xclient_zmq.hpp @@ -29,6 +29,7 @@ namespace xeus public: using listener = std::function; + using kernel_status_listener = std::function; explicit xclient_zmq(std::unique_ptr impl); ~xclient_zmq(); @@ -42,10 +43,12 @@ namespace xeus void register_shell_listener(const listener& l); void register_control_listener(const listener& l); void register_iopub_listener(const listener& l); + void register_kernel_status_listener(const kernel_status_listener& l); void notify_shell_listener(xmessage msg); void notify_control_listener(xmessage msg); void notify_iopub_listener(xmessage msg); + void notify_kernel_dead(bool status); std::size_t iopub_queue_size() const; std::optional pop_iopub_message(); diff --git a/src/xclient_zmq.cpp b/src/xclient_zmq.cpp index ad9dc54..61fa71d 100644 --- a/src/xclient_zmq.cpp +++ b/src/xclient_zmq.cpp @@ -58,6 +58,11 @@ namespace xeus p_client_impl->register_iopub_listener(l); } + void xclient_zmq::register_kernel_status_listener(const kernel_status_listener& l) + { + p_client_impl->register_kernel_status_listener(l); + } + void xclient_zmq::notify_shell_listener(xmessage msg) { p_client_impl->notify_shell_listener(std::move(msg)); @@ -73,6 +78,11 @@ namespace xeus p_client_impl->notify_iopub_listener(std::move(msg)); } + void xclient_zmq::notify_kernel_dead(bool status) + { + p_client_impl->notify_kernel_dead(status); + } + std::size_t xclient_zmq::iopub_queue_size() const { return p_client_impl->iopub_queue_size(); diff --git a/src/xclient_zmq_impl.cpp b/src/xclient_zmq_impl.cpp index 699ade0..269c0ac 100644 --- a/src/xclient_zmq_impl.cpp +++ b/src/xclient_zmq_impl.cpp @@ -23,6 +23,7 @@ namespace xeus , m_shell_client(context, config.m_transport, config.m_ip, config.m_shell_port) , m_control_client(context, config.m_transport, config.m_ip, config.m_control_port) , m_iopub_client(context, config) + , m_heartbeat_client(context, config, m_max_retry, m_heartbeat_timeout) , p_messenger(context) , m_error_handler(eh) { @@ -93,6 +94,11 @@ namespace xeus m_iopub_listener = l; } + void xclient_zmq_impl::register_kernel_status_listener(const kernel_status_listener& l) + { + m_heartbeat_client.register_kernel_status_listener(l); + } + void xclient_zmq_impl::connect() { p_messenger.connect(); @@ -118,6 +124,11 @@ namespace xeus m_iopub_listener(std::move(msg)); } + void xclient_zmq_impl::notify_kernel_dead(bool status) + { + m_heartbeat_client.notify_kernel_dead(status); + } + void xclient_zmq_impl::poll(long timeout) { zmq::multipart_t wire_msg; @@ -166,7 +177,7 @@ namespace xeus void xclient_zmq_impl::start() { start_iopub_thread(); - // TODO : Introduce a client, xheartbeat_client that runs on its own thread, m_heartbeat_thread. + start_heartbeat_thread(); } void xclient_zmq_impl::start_iopub_thread() @@ -174,6 +185,11 @@ namespace xeus m_iopub_thread = std::move(xthread(&xiopub_client::run, p_iopub_client.get())); } + void xclient_zmq_impl::start_heartbeat_thread() + { + m_heartbeat_thread = std::move(xthread(&xheartbeat_client::run, p_heartbeat_client.get())); + } + xmessage xclient_zmq_impl::deserialize(zmq::multipart_t& wire_msg) const { return xzmq_serializer::deserialize(wire_msg, *p_auth); diff --git a/src/xclient_zmq_impl.hpp b/src/xclient_zmq_impl.hpp index 30cfdcb..9854de5 100644 --- a/src/xclient_zmq_impl.hpp +++ b/src/xclient_zmq_impl.hpp @@ -21,6 +21,7 @@ #include "xdealer_channel.hpp" #include "xiopub_client.hpp" +#include "xheartbeat_client.hpp" #include "xclient_messenger.hpp" namespace xeus @@ -31,7 +32,9 @@ namespace xeus { public: using iopub_client_ptr = std::unique_ptr; + using heartbeat_client_ptr = std::unique_ptr; using listener = std::function; + using kernel_status_listener = std::function; xclient_zmq_impl(zmq::context_t& context, const xconfiguration& config, @@ -60,8 +63,8 @@ namespace xeus std::optional pop_iopub_message(); void register_iopub_listener(const listener& l); - // hearbeat channel - // TODO + // heartbeat channel + void register_kernel_status_listener(const kernel_status_listener& l); // client messenger void connect(); @@ -70,6 +73,7 @@ namespace xeus void notify_shell_listener(xmessage msg); void notify_control_listener(xmessage msg); void notify_iopub_listener(xmessage msg); + void notify_kernel_dead(bool status); void wait_for_message(); void start(); @@ -78,6 +82,7 @@ namespace xeus private: void start_iopub_thread(); + void start_heartbeat_thread(); void poll(long timeout); using authentication_ptr = std::unique_ptr; @@ -86,6 +91,10 @@ namespace xeus xdealer_channel m_shell_client; xdealer_channel m_control_client; xiopub_client m_iopub_client; + xheartbeat_client m_heartbeat_client; + + const std::size_t m_max_retry = 3; + const long m_heartbeat_timeout = std::chrono::milliseconds(90).count(); xclient_messenger p_messenger; @@ -96,8 +105,10 @@ namespace xeus listener m_iopub_listener; iopub_client_ptr p_iopub_client; + heartbeat_client_ptr p_heartbeat_client; xthread m_iopub_thread; + xthread m_heartbeat_thread; }; } diff --git a/src/xdealer_channel.hpp b/src/xdealer_channel.hpp index 4893dc2..bcb591a 100644 --- a/src/xdealer_channel.hpp +++ b/src/xdealer_channel.hpp @@ -13,9 +13,6 @@ #include "zmq.hpp" #include "zmq_addon.hpp" -#include "nlohmann/json.hpp" -#include "xeus/xkernel_configuration.hpp" - namespace xeus { diff --git a/src/xheartbeat_client.cpp b/src/xheartbeat_client.cpp new file mode 100644 index 0000000..2e8c4da --- /dev/null +++ b/src/xheartbeat_client.cpp @@ -0,0 +1,81 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#include "xheartbeat_client.hpp" +#include "xclient_zmq_impl.hpp" + +#include "xeus-zmq/xmiddleware.hpp" + +namespace xeus +{ + + xheartbeat_client::xheartbeat_client(zmq::context_t& context, + const xeus::xconfiguration& config, + const std::size_t max_retry, + const long timeout) + : m_heartbeat(context, zmq::socket_type::req) + , m_controller(context, zmq::socket_type::rep) + , m_max_retry(max_retry) + , m_heartbeat_timeout(timeout) + { + m_heartbeat.connect(get_end_point(config.m_transport, config.m_ip, config.m_hb_port)); + init_socket(m_controller, get_controller_end_point("heartbeat")); + } + + xheartbeat_client::~xheartbeat_client() + { + } + + void xheartbeat_client::send_heartbeat_message() + { + zmq::message_t ping_msg("ping", 4); + m_heartbeat.send(ping_msg, zmq::send_flags::none); + } + + bool xheartbeat_client::wait_for_answer(long timeout) + { + m_heartbeat.set(zmq::sockopt::linger, static_cast(timeout)); + zmq::message_t response; + return m_heartbeat.recv(response).has_value(); + } + + void xheartbeat_client::register_kernel_status_listener(const kernel_status_listener& l) + { + m_kernel_status_listener = l; + } + + void xheartbeat_client::notify_kernel_dead(bool status) + { + m_kernel_status_listener(status); + } + + void xheartbeat_client::run() + { + bool stop = false; + std::size_t retry_count = 0; + + while(!stop) + { + send_heartbeat_message(); + if(!wait_for_answer(m_heartbeat_timeout)) + { + if (retry_count < m_max_retry) + { + ++retry_count; + } else { + notify_kernel_dead(true); + stop = true; + } + } else { + retry_count = 0; + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + } +} \ No newline at end of file diff --git a/src/xheartbeat_client.hpp b/src/xheartbeat_client.hpp new file mode 100644 index 0000000..8bfd876 --- /dev/null +++ b/src/xheartbeat_client.hpp @@ -0,0 +1,52 @@ +/*************************************************************************** +* Copyright (c) 2016, Johan Mabille, Sylvain Corlay, Martin Renou * +* Copyright (c) 2016, QuantStack * +* * +* Distributed under the terms of the BSD 3-Clause License. * +* * +* The full license is in the file LICENSE, distributed with this software. * +****************************************************************************/ + +#ifndef XEUS_HEARTBEAT_CLIENT_HPP +#define XEUS_HEARTBEAT_CLIENT_HPP + +#include + +#include "zmq.hpp" + +#include "xeus/xkernel_configuration.hpp" + +namespace xeus +{ + class xheartbeat_client + { + public: + + using kernel_status_listener = std::function; + + xheartbeat_client(zmq::context_t& context, + const xeus::xconfiguration& config, + const std::size_t max_retry, + const long timeout); + + ~xheartbeat_client(); + + void run(); + + void register_kernel_status_listener(const kernel_status_listener& l); + void notify_kernel_dead(bool status); + + private: + void send_heartbeat_message(); + bool wait_for_answer(long timeout); + + zmq::socket_t m_heartbeat; + zmq::socket_t m_controller; + + kernel_status_listener m_kernel_status_listener; + const std::size_t m_max_retry; + const long m_heartbeat_timeout; + }; +} + +#endif \ No newline at end of file diff --git a/src/xiopub_client.hpp b/src/xiopub_client.hpp index f374e76..79689fb 100644 --- a/src/xiopub_client.hpp +++ b/src/xiopub_client.hpp @@ -14,15 +14,10 @@ #include #include "zmq.hpp" -#include "nlohmann/json.hpp" #include "xeus/xmessage.hpp" -#include "xeus/xeus_context.hpp" #include "xeus/xkernel_configuration.hpp" -#include "xeus-zmq/xthread.hpp" -#include "xeus-zmq/xmiddleware.hpp" - namespace xeus { class xclient_zmq_impl;