diff --git a/include/xeus-zmq/xclient_zmq.hpp b/include/xeus-zmq/xclient_zmq.hpp index 1397c3b..186210b 100644 --- a/include/xeus-zmq/xclient_zmq.hpp +++ b/include/xeus-zmq/xclient_zmq.hpp @@ -34,31 +34,35 @@ namespace xeus explicit xclient_zmq(std::unique_ptr impl); ~xclient_zmq(); + + void connect(); + void start(); + void stop_channels(); void send_on_shell(xmessage msg); void send_on_control(xmessage msg); - std::optional check_shell_answer(); - std::optional check_control_answer(); + // APIs for receiving on a specified channel + std::optional receive_on_shell(bool bocking = true); + std::optional receive_on_control(bool blocking = true); + + std::size_t iopub_queue_size() const; + std::optional pop_iopub_message(); + // APIs for receiving on all channels void register_shell_listener(const listener& l); void register_control_listener(const listener& l); void register_iopub_listener(const iopub_listener& l); void register_kernel_status_listener(const kernel_status_listener& l); - void notify_shell_listener(xmessage msg); - void notify_control_listener(xmessage msg); - void notify_iopub_listener(xpub_message msg); - void notify_kernel_dead(bool status); - - std::size_t iopub_queue_size() const; - std::optional pop_iopub_message(); - void connect(); - void stop_channels(); - void start(); void wait_for_message(); private: + + void notify_shell_listener(xmessage msg); + void notify_control_listener(xmessage msg); + void notify_iopub_listener(xpub_message msg); + std::unique_ptr p_client_impl; }; @@ -68,4 +72,4 @@ namespace xeus nl::json::error_handler_t eh = nl::json::error_handler_t::strict); } -#endif \ No newline at end of file +#endif diff --git a/src/client/xclient_zmq.cpp b/src/client/xclient_zmq.cpp index cedc9c2..365e395 100644 --- a/src/client/xclient_zmq.cpp +++ b/src/client/xclient_zmq.cpp @@ -22,6 +22,20 @@ namespace xeus // types are used in unique_ptr in the header xclient_zmq::~xclient_zmq() = default; + void xclient_zmq::connect() + { + p_client_impl->connect(); + } + + void xclient_zmq::start() + { + p_client_impl->start(); + } + + void xclient_zmq::stop_channels() + { + p_client_impl->stop_channels(); + } void xclient_zmq::send_on_shell(xmessage msg) { @@ -33,14 +47,24 @@ namespace xeus p_client_impl->send_on_control(std::move(msg)); } - std::optional xclient_zmq::check_shell_answer() + std::optional xclient_zmq::receive_on_shell(bool blocking) + { + return p_client_impl->receive_on_shell(blocking); + } + + std::optional xclient_zmq::receive_on_control(bool blocking) + { + return p_client_impl->receive_on_control(blocking); + } + + std::size_t xclient_zmq::iopub_queue_size() const { - return p_client_impl->receive_on_shell(0); + return p_client_impl->iopub_queue_size(); } - std::optional xclient_zmq::check_control_answer() + std::optional xclient_zmq::pop_iopub_message() { - return p_client_impl->receive_on_control(0); + return p_client_impl->pop_iopub_message(); } void xclient_zmq::register_shell_listener(const listener& l) @@ -63,6 +87,11 @@ namespace xeus p_client_impl->register_kernel_status_listener(l); } + void xclient_zmq::wait_for_message() + { + p_client_impl->wait_for_message(); + } + void xclient_zmq::notify_shell_listener(xmessage msg) { p_client_impl->notify_shell_listener(std::move(msg)); @@ -77,42 +106,7 @@ namespace xeus { p_client_impl->notify_iopub_listener(std::move(msg)); } - - void xclient_zmq::notify_kernel_dead(bool status) - { - p_client_impl->notify_kernel_dead(status); - } - - std::size_t xclient_zmq::iopub_queue_size() const - { - return p_client_impl->iopub_queue_size(); - } - - std::optional xclient_zmq::pop_iopub_message() - { - return p_client_impl->pop_iopub_message(); - } - - void xclient_zmq::connect() - { - p_client_impl->connect(); - } - - void xclient_zmq::stop_channels() - { - p_client_impl->stop_channels(); - } - - void xclient_zmq::start() - { - p_client_impl->start(); - } - - void xclient_zmq::wait_for_message() - { - p_client_impl->wait_for_message(); - } - + std::unique_ptr make_xclient_zmq(xcontext& context, const xconfiguration& config, nl::json::error_handler_t eh) diff --git a/src/client/xclient_zmq_impl.cpp b/src/client/xclient_zmq_impl.cpp index f4be177..9190762 100644 --- a/src/client/xclient_zmq_impl.cpp +++ b/src/client/xclient_zmq_impl.cpp @@ -50,26 +50,30 @@ namespace xeus m_control_client.send_message(wire_msg); } - std::optional xclient_zmq_impl::receive_on_shell(long timeout) + std::optional xclient_zmq_impl::receive_on_shell(bool blocking) { - std::optional wire_msg = m_shell_client.receive_message(timeout); + std::optional wire_msg = m_shell_client.receive_message(blocking); if (wire_msg.has_value()) { return deserialize(wire_msg.value()); - } else { + } + else + { return std::nullopt; } } - std::optional xclient_zmq_impl::receive_on_control(long timeout) + std::optional xclient_zmq_impl::receive_on_control(bool blocking) { - std::optional wire_msg = m_control_client.receive_message(timeout); + std::optional wire_msg = m_control_client.receive_message(blocking); if (wire_msg.has_value()) { return deserialize(wire_msg.value()); - } else { + } + else + { return std::nullopt; } } diff --git a/src/client/xclient_zmq_impl.hpp b/src/client/xclient_zmq_impl.hpp index f733865..11fa286 100644 --- a/src/client/xclient_zmq_impl.hpp +++ b/src/client/xclient_zmq_impl.hpp @@ -51,12 +51,12 @@ namespace xeus // shell channel void send_on_shell(xmessage msg); - std::optional receive_on_shell(long timeout); + std::optional receive_on_shell(bool blocking); void register_shell_listener(const listener& l); // control channel void send_on_control(xmessage msg); - std::optional receive_on_control(long timeout); + std::optional receive_on_control(bool blocking); void register_control_listener(const listener& l); // iopub channel @@ -83,6 +83,7 @@ namespace xeus xpub_message deserialize_iopub(zmq::multipart_t& wire_msg) const; private: + void start_iopub_thread(); void start_heartbeat_thread(); void poll(long timeout); diff --git a/src/client/xdealer_channel.cpp b/src/client/xdealer_channel.cpp index 57dc892..85629c8 100644 --- a/src/client/xdealer_channel.cpp +++ b/src/client/xdealer_channel.cpp @@ -35,12 +35,12 @@ namespace xeus message.send(m_socket); } - std::optional xdealer_channel::receive_message(long timeout) + std::optional xdealer_channel::receive_message(bool blocking) { zmq::multipart_t wire_msg; zmq::recv_flags flags = zmq::recv_flags::none; - if (timeout == 0) + if (!blocking) { flags = zmq::recv_flags::dontwait; } @@ -48,7 +48,9 @@ namespace xeus if (wire_msg.recv(m_socket, static_cast(flags))) { return wire_msg; - } else { + } + else + { return std::nullopt; } } @@ -57,4 +59,4 @@ namespace xeus { return m_socket; } -} \ No newline at end of file +} diff --git a/src/client/xdealer_channel.hpp b/src/client/xdealer_channel.hpp index 871f2e8..917e2bf 100644 --- a/src/client/xdealer_channel.hpp +++ b/src/client/xdealer_channel.hpp @@ -28,15 +28,15 @@ namespace xeus ~xdealer_channel(); void send_message(zmq::multipart_t& message); - std::optional receive_message(long timeout); + std::optional receive_message(bool blocking); zmq::socket_t& get_socket(); private: - zmq::socket_t m_socket; + zmq::socket_t m_socket; std::string m_dealer_end_point; }; } -#endif \ No newline at end of file +#endif diff --git a/test/client_ipc.cpp b/test/client_ipc.cpp index 4f74dfb..fbf8bf9 100644 --- a/test/client_ipc.cpp +++ b/test/client_ipc.cpp @@ -52,7 +52,7 @@ int main(int, char**) ipc_client.send_on_shell(std::move(msg)); - auto response = ipc_client.check_shell_answer(); + auto response = ipc_client.receive_on_shell(false); if (response.has_value()) { std::cout << response->content().dump(4) << std::endl; diff --git a/test/xipc_client.cpp b/test/xipc_client.cpp index 6521a9a..f6f5961 100644 --- a/test/xipc_client.cpp +++ b/test/xipc_client.cpp @@ -21,8 +21,8 @@ namespace xeus p_client->send_on_shell(std::move(msg)); } - std::optional xipc_client::check_shell_answer() + std::optional xipc_client::receive_on_shell(bool blocking) { - return p_client->check_shell_answer(); + return p_client->receive_on_shell(blocking); } -} \ No newline at end of file +} diff --git a/test/xipc_client.hpp b/test/xipc_client.hpp index 62779f0..e814e4b 100644 --- a/test/xipc_client.hpp +++ b/test/xipc_client.hpp @@ -21,7 +21,7 @@ namespace xeus xipc_client(xcontext& context, const xconfiguration& config); void send_on_shell(xmessage msg); - std::optional check_shell_answer(); + std::optional receive_on_shell(bool blocking = true); private: client_ptr p_client;