diff --git a/src/client/xclient_zmq_impl.cpp b/src/client/xclient_zmq_impl.cpp index 23b08df..f4be177 100644 --- a/src/client/xclient_zmq_impl.cpp +++ b/src/client/xclient_zmq_impl.cpp @@ -27,7 +27,7 @@ namespace xeus : p_auth(make_xauthentication(config.m_signature_scheme, config.m_key)) , m_shell_client(context, config.m_transport, config.m_ip, config.m_shell_port) , m_control_client(context, config.m_transport, config.m_ip, config.m_control_port) - , m_iopub_client(context, config) + , m_iopub_client(context, config, this) , m_heartbeat_client(context, config, max_retry, heartbeat_timeout) , p_messenger(context) , m_error_handler(eh) @@ -174,7 +174,9 @@ namespace xeus if (pending_message.has_value()) { notify_iopub_listener(std::move(*pending_message)); - } else { + } + else + { poll(-1); } } @@ -187,12 +189,12 @@ namespace xeus void xclient_zmq_impl::start_iopub_thread() { - m_iopub_thread = std::move(xthread(&xiopub_client::run, p_iopub_client.get())); + m_iopub_thread = std::move(xthread(&xiopub_client::run, &m_iopub_client)); } void xclient_zmq_impl::start_heartbeat_thread() { - m_heartbeat_thread = std::move(xthread(&xheartbeat_client::run, p_heartbeat_client.get())); + m_heartbeat_thread = std::move(xthread(&xheartbeat_client::run, &m_heartbeat_client)); } xmessage xclient_zmq_impl::deserialize(zmq::multipart_t& wire_msg) const diff --git a/src/client/xclient_zmq_impl.hpp b/src/client/xclient_zmq_impl.hpp index 12477b6..f733865 100644 --- a/src/client/xclient_zmq_impl.hpp +++ b/src/client/xclient_zmq_impl.hpp @@ -103,9 +103,6 @@ namespace xeus listener m_control_listener; iopub_listener m_iopub_listener; - iopub_client_ptr p_iopub_client; - heartbeat_client_ptr p_heartbeat_client; - xthread m_iopub_thread; xthread m_heartbeat_thread; }; diff --git a/src/client/xheartbeat_client.cpp b/src/client/xheartbeat_client.cpp index 5046ff4..f84e3e9 100644 --- a/src/client/xheartbeat_client.cpp +++ b/src/client/xheartbeat_client.cpp @@ -70,11 +70,15 @@ namespace xeus if (retry_count < m_max_retry) { ++retry_count; - } else { + } + else + { notify_kernel_dead(true); stop = true; } - } else { + } + else + { retry_count = 0; } std::this_thread::sleep_for(std::chrono::milliseconds(100)); diff --git a/src/client/xiopub_client.cpp b/src/client/xiopub_client.cpp index 6213103..00158ef 100644 --- a/src/client/xiopub_client.cpp +++ b/src/client/xiopub_client.cpp @@ -18,13 +18,16 @@ namespace xeus { xiopub_client::xiopub_client(zmq::context_t& context, - const xeus::xconfiguration& config) + const xeus::xconfiguration& config, + xclient_zmq_impl* client) : m_iopub(context, zmq::socket_type::sub) , m_controller(context, zmq::socket_type::rep) , m_iopub_end_point("") + , p_client_impl(client) { m_iopub_end_point = get_end_point(config.m_transport, config.m_ip, config.m_iopub_port); m_iopub.connect(m_iopub_end_point); + m_iopub.set(zmq::sockopt::subscribe, ""); init_socket(m_controller, get_controller_end_point("iopub")); } @@ -47,7 +50,9 @@ namespace xeus xpub_message msg = std::move(m_message_queue.back()); m_message_queue.pop(); return msg; - } else { + } + else + { return std::nullopt; } } diff --git a/src/client/xiopub_client.hpp b/src/client/xiopub_client.hpp index 2acb731..6b04b06 100644 --- a/src/client/xiopub_client.hpp +++ b/src/client/xiopub_client.hpp @@ -27,7 +27,8 @@ namespace xeus public: xiopub_client(zmq::context_t& context, - const xeus::xconfiguration& config); + const xeus::xconfiguration& config, + xclient_zmq_impl* client); ~xiopub_client(); @@ -49,4 +50,4 @@ namespace xeus }; } -#endif \ No newline at end of file +#endif