Skip to content

Commit

Permalink
Merge pull request #71 from JohanMabille/client_api
Browse files Browse the repository at this point in the history
Improved client APIs
  • Loading branch information
JohanMabille authored Jun 28, 2024
2 parents 3f50879 + f0c48aa commit 3142b07
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 73 deletions.
30 changes: 17 additions & 13 deletions include/xeus-zmq/xclient_zmq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,31 +34,35 @@ namespace xeus

explicit xclient_zmq(std::unique_ptr<xclient_zmq_impl> impl);
~xclient_zmq();

void connect();
void start();
void stop_channels();

void send_on_shell(xmessage msg);
void send_on_control(xmessage msg);

std::optional<xmessage> check_shell_answer();
std::optional<xmessage> check_control_answer();
// APIs for receiving on a specified channel
std::optional<xmessage> receive_on_shell(bool bocking = true);
std::optional<xmessage> receive_on_control(bool blocking = true);

std::size_t iopub_queue_size() const;
std::optional<xpub_message> pop_iopub_message();

// APIs for receiving on all channels
void register_shell_listener(const listener& l);
void register_control_listener(const listener& l);
void register_iopub_listener(const iopub_listener& l);
void register_kernel_status_listener(const kernel_status_listener& l);

void notify_shell_listener(xmessage msg);
void notify_control_listener(xmessage msg);
void notify_iopub_listener(xpub_message msg);
void notify_kernel_dead(bool status);

std::size_t iopub_queue_size() const;
std::optional<xpub_message> pop_iopub_message();
void connect();
void stop_channels();
void start();
void wait_for_message();

private:

void notify_shell_listener(xmessage msg);
void notify_control_listener(xmessage msg);
void notify_iopub_listener(xpub_message msg);

std::unique_ptr<xclient_zmq_impl> p_client_impl;
};

Expand All @@ -68,4 +72,4 @@ namespace xeus
nl::json::error_handler_t eh = nl::json::error_handler_t::strict);
}

#endif
#endif
74 changes: 34 additions & 40 deletions src/client/xclient_zmq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@ namespace xeus
// types are used in unique_ptr in the header
xclient_zmq::~xclient_zmq() = default;

void xclient_zmq::connect()
{
p_client_impl->connect();
}

void xclient_zmq::start()
{
p_client_impl->start();
}

void xclient_zmq::stop_channels()
{
p_client_impl->stop_channels();
}

void xclient_zmq::send_on_shell(xmessage msg)
{
Expand All @@ -33,14 +47,24 @@ namespace xeus
p_client_impl->send_on_control(std::move(msg));
}

std::optional<xmessage> xclient_zmq::check_shell_answer()
std::optional<xmessage> xclient_zmq::receive_on_shell(bool blocking)
{
return p_client_impl->receive_on_shell(blocking);
}

std::optional<xmessage> xclient_zmq::receive_on_control(bool blocking)
{
return p_client_impl->receive_on_control(blocking);
}

std::size_t xclient_zmq::iopub_queue_size() const
{
return p_client_impl->receive_on_shell(0);
return p_client_impl->iopub_queue_size();
}

std::optional<xmessage> xclient_zmq::check_control_answer()
std::optional<xpub_message> xclient_zmq::pop_iopub_message()
{
return p_client_impl->receive_on_control(0);
return p_client_impl->pop_iopub_message();
}

void xclient_zmq::register_shell_listener(const listener& l)
Expand All @@ -63,6 +87,11 @@ namespace xeus
p_client_impl->register_kernel_status_listener(l);
}

void xclient_zmq::wait_for_message()
{
p_client_impl->wait_for_message();
}

void xclient_zmq::notify_shell_listener(xmessage msg)
{
p_client_impl->notify_shell_listener(std::move(msg));
Expand All @@ -77,42 +106,7 @@ namespace xeus
{
p_client_impl->notify_iopub_listener(std::move(msg));
}

void xclient_zmq::notify_kernel_dead(bool status)
{
p_client_impl->notify_kernel_dead(status);
}

std::size_t xclient_zmq::iopub_queue_size() const
{
return p_client_impl->iopub_queue_size();
}

std::optional<xpub_message> xclient_zmq::pop_iopub_message()
{
return p_client_impl->pop_iopub_message();
}

void xclient_zmq::connect()
{
p_client_impl->connect();
}

void xclient_zmq::stop_channels()
{
p_client_impl->stop_channels();
}

void xclient_zmq::start()
{
p_client_impl->start();
}

void xclient_zmq::wait_for_message()
{
p_client_impl->wait_for_message();
}


std::unique_ptr<xclient_zmq> make_xclient_zmq(xcontext& context,
const xconfiguration& config,
nl::json::error_handler_t eh)
Expand Down
16 changes: 10 additions & 6 deletions src/client/xclient_zmq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,30 @@ namespace xeus
m_control_client.send_message(wire_msg);
}

std::optional<xmessage> xclient_zmq_impl::receive_on_shell(long timeout)
std::optional<xmessage> xclient_zmq_impl::receive_on_shell(bool blocking)
{
std::optional<zmq::multipart_t> wire_msg = m_shell_client.receive_message(timeout);
std::optional<zmq::multipart_t> wire_msg = m_shell_client.receive_message(blocking);

if (wire_msg.has_value())
{
return deserialize(wire_msg.value());
} else {
}
else
{
return std::nullopt;
}
}

std::optional<xmessage> xclient_zmq_impl::receive_on_control(long timeout)
std::optional<xmessage> xclient_zmq_impl::receive_on_control(bool blocking)
{
std::optional<zmq::multipart_t> wire_msg = m_control_client.receive_message(timeout);
std::optional<zmq::multipart_t> wire_msg = m_control_client.receive_message(blocking);

if (wire_msg.has_value())
{
return deserialize(wire_msg.value());
} else {
}
else
{
return std::nullopt;
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/client/xclient_zmq_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ namespace xeus

// shell channel
void send_on_shell(xmessage msg);
std::optional<xmessage> receive_on_shell(long timeout);
std::optional<xmessage> receive_on_shell(bool blocking);
void register_shell_listener(const listener& l);

// control channel
void send_on_control(xmessage msg);
std::optional<xmessage> receive_on_control(long timeout);
std::optional<xmessage> receive_on_control(bool blocking);
void register_control_listener(const listener& l);

// iopub channel
Expand All @@ -83,6 +83,7 @@ namespace xeus
xpub_message deserialize_iopub(zmq::multipart_t& wire_msg) const;

private:

void start_iopub_thread();
void start_heartbeat_thread();
void poll(long timeout);
Expand Down
10 changes: 6 additions & 4 deletions src/client/xdealer_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,22 @@ namespace xeus
message.send(m_socket);
}

std::optional<zmq::multipart_t> xdealer_channel::receive_message(long timeout)
std::optional<zmq::multipart_t> xdealer_channel::receive_message(bool blocking)
{
zmq::multipart_t wire_msg;
zmq::recv_flags flags = zmq::recv_flags::none;

if (timeout == 0)
if (!blocking)
{
flags = zmq::recv_flags::dontwait;
}

if (wire_msg.recv(m_socket, static_cast<int>(flags)))
{
return wire_msg;
} else {
}
else
{
return std::nullopt;
}
}
Expand All @@ -57,4 +59,4 @@ namespace xeus
{
return m_socket;
}
}
}
6 changes: 3 additions & 3 deletions src/client/xdealer_channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ namespace xeus
~xdealer_channel();

void send_message(zmq::multipart_t& message);
std::optional<zmq::multipart_t> receive_message(long timeout);
std::optional<zmq::multipart_t> receive_message(bool blocking);

zmq::socket_t& get_socket();

private:
zmq::socket_t m_socket;

zmq::socket_t m_socket;
std::string m_dealer_end_point;
};
}

#endif
#endif
2 changes: 1 addition & 1 deletion test/client_ipc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ int main(int, char**)

ipc_client.send_on_shell(std::move(msg));

auto response = ipc_client.check_shell_answer();
auto response = ipc_client.receive_on_shell(false);
if (response.has_value())
{
std::cout << response->content().dump(4) << std::endl;
Expand Down
6 changes: 3 additions & 3 deletions test/xipc_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ namespace xeus
p_client->send_on_shell(std::move(msg));
}

std::optional<xmessage> xipc_client::check_shell_answer()
std::optional<xmessage> xipc_client::receive_on_shell(bool blocking)
{
return p_client->check_shell_answer();
return p_client->receive_on_shell(blocking);
}
}
}
2 changes: 1 addition & 1 deletion test/xipc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace xeus
xipc_client(xcontext& context, const xconfiguration& config);

void send_on_shell(xmessage msg);
std::optional<xmessage> check_shell_answer();
std::optional<xmessage> receive_on_shell(bool blocking = true);

private:
client_ptr p_client;
Expand Down

0 comments on commit 3142b07

Please sign in to comment.