Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved client APIs #71

Merged
merged 1 commit into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions include/xeus-zmq/xclient_zmq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,31 +34,35 @@ namespace xeus

explicit xclient_zmq(std::unique_ptr<xclient_zmq_impl> impl);
~xclient_zmq();

void connect();
void start();
void stop_channels();

void send_on_shell(xmessage msg);
void send_on_control(xmessage msg);

std::optional<xmessage> check_shell_answer();
std::optional<xmessage> check_control_answer();
// APIs for receiving on a specified channel
std::optional<xmessage> receive_on_shell(bool bocking = true);
std::optional<xmessage> receive_on_control(bool blocking = true);

std::size_t iopub_queue_size() const;
std::optional<xpub_message> pop_iopub_message();

// APIs for receiving on all channels
void register_shell_listener(const listener& l);
void register_control_listener(const listener& l);
void register_iopub_listener(const iopub_listener& l);
void register_kernel_status_listener(const kernel_status_listener& l);

void notify_shell_listener(xmessage msg);
void notify_control_listener(xmessage msg);
void notify_iopub_listener(xpub_message msg);
void notify_kernel_dead(bool status);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should notify_kernel_dead be dropped here ?


std::size_t iopub_queue_size() const;
std::optional<xpub_message> pop_iopub_message();
void connect();
void stop_channels();
void start();
void wait_for_message();

private:

void notify_shell_listener(xmessage msg);
void notify_control_listener(xmessage msg);
void notify_iopub_listener(xpub_message msg);

std::unique_ptr<xclient_zmq_impl> p_client_impl;
};

Expand All @@ -68,4 +72,4 @@ namespace xeus
nl::json::error_handler_t eh = nl::json::error_handler_t::strict);
}

#endif
#endif
74 changes: 34 additions & 40 deletions src/client/xclient_zmq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@ namespace xeus
// types are used in unique_ptr in the header
xclient_zmq::~xclient_zmq() = default;

void xclient_zmq::connect()
{
p_client_impl->connect();
}

void xclient_zmq::start()
{
p_client_impl->start();
}

void xclient_zmq::stop_channels()
{
p_client_impl->stop_channels();
}

void xclient_zmq::send_on_shell(xmessage msg)
{
Expand All @@ -33,14 +47,24 @@ namespace xeus
p_client_impl->send_on_control(std::move(msg));
}

std::optional<xmessage> xclient_zmq::check_shell_answer()
std::optional<xmessage> xclient_zmq::receive_on_shell(bool blocking)
{
return p_client_impl->receive_on_shell(blocking);
}

std::optional<xmessage> xclient_zmq::receive_on_control(bool blocking)
{
return p_client_impl->receive_on_control(blocking);
}

std::size_t xclient_zmq::iopub_queue_size() const
{
return p_client_impl->receive_on_shell(0);
return p_client_impl->iopub_queue_size();
}

std::optional<xmessage> xclient_zmq::check_control_answer()
std::optional<xpub_message> xclient_zmq::pop_iopub_message()
{
return p_client_impl->receive_on_control(0);
return p_client_impl->pop_iopub_message();
}

void xclient_zmq::register_shell_listener(const listener& l)
Expand All @@ -63,6 +87,11 @@ namespace xeus
p_client_impl->register_kernel_status_listener(l);
}

void xclient_zmq::wait_for_message()
{
p_client_impl->wait_for_message();
}

void xclient_zmq::notify_shell_listener(xmessage msg)
{
p_client_impl->notify_shell_listener(std::move(msg));
Expand All @@ -77,42 +106,7 @@ namespace xeus
{
p_client_impl->notify_iopub_listener(std::move(msg));
}

void xclient_zmq::notify_kernel_dead(bool status)
{
p_client_impl->notify_kernel_dead(status);
}

std::size_t xclient_zmq::iopub_queue_size() const
{
return p_client_impl->iopub_queue_size();
}

std::optional<xpub_message> xclient_zmq::pop_iopub_message()
{
return p_client_impl->pop_iopub_message();
}

void xclient_zmq::connect()
{
p_client_impl->connect();
}

void xclient_zmq::stop_channels()
{
p_client_impl->stop_channels();
}

void xclient_zmq::start()
{
p_client_impl->start();
}

void xclient_zmq::wait_for_message()
{
p_client_impl->wait_for_message();
}


std::unique_ptr<xclient_zmq> make_xclient_zmq(xcontext& context,
const xconfiguration& config,
nl::json::error_handler_t eh)
Expand Down
16 changes: 10 additions & 6 deletions src/client/xclient_zmq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,30 @@ namespace xeus
m_control_client.send_message(wire_msg);
}

std::optional<xmessage> xclient_zmq_impl::receive_on_shell(long timeout)
std::optional<xmessage> xclient_zmq_impl::receive_on_shell(bool blocking)
{
std::optional<zmq::multipart_t> wire_msg = m_shell_client.receive_message(timeout);
std::optional<zmq::multipart_t> wire_msg = m_shell_client.receive_message(blocking);

if (wire_msg.has_value())
{
return deserialize(wire_msg.value());
} else {
}
else
{
return std::nullopt;
}
}

std::optional<xmessage> xclient_zmq_impl::receive_on_control(long timeout)
std::optional<xmessage> xclient_zmq_impl::receive_on_control(bool blocking)
{
std::optional<zmq::multipart_t> wire_msg = m_control_client.receive_message(timeout);
std::optional<zmq::multipart_t> wire_msg = m_control_client.receive_message(blocking);

if (wire_msg.has_value())
{
return deserialize(wire_msg.value());
} else {
}
else
{
return std::nullopt;
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/client/xclient_zmq_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ namespace xeus

// shell channel
void send_on_shell(xmessage msg);
std::optional<xmessage> receive_on_shell(long timeout);
std::optional<xmessage> receive_on_shell(bool blocking);
void register_shell_listener(const listener& l);

// control channel
void send_on_control(xmessage msg);
std::optional<xmessage> receive_on_control(long timeout);
std::optional<xmessage> receive_on_control(bool blocking);
void register_control_listener(const listener& l);

// iopub channel
Expand All @@ -83,6 +83,7 @@ namespace xeus
xpub_message deserialize_iopub(zmq::multipart_t& wire_msg) const;

private:

void start_iopub_thread();
void start_heartbeat_thread();
void poll(long timeout);
Expand Down
10 changes: 6 additions & 4 deletions src/client/xdealer_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,22 @@ namespace xeus
message.send(m_socket);
}

std::optional<zmq::multipart_t> xdealer_channel::receive_message(long timeout)
std::optional<zmq::multipart_t> xdealer_channel::receive_message(bool blocking)
{
zmq::multipart_t wire_msg;
zmq::recv_flags flags = zmq::recv_flags::none;

if (timeout == 0)
if (!blocking)
{
flags = zmq::recv_flags::dontwait;
}

if (wire_msg.recv(m_socket, static_cast<int>(flags)))
{
return wire_msg;
} else {
}
else
{
return std::nullopt;
}
}
Expand All @@ -57,4 +59,4 @@ namespace xeus
{
return m_socket;
}
}
}
6 changes: 3 additions & 3 deletions src/client/xdealer_channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ namespace xeus
~xdealer_channel();

void send_message(zmq::multipart_t& message);
std::optional<zmq::multipart_t> receive_message(long timeout);
std::optional<zmq::multipart_t> receive_message(bool blocking);

zmq::socket_t& get_socket();

private:
zmq::socket_t m_socket;

zmq::socket_t m_socket;
std::string m_dealer_end_point;
};
}

#endif
#endif
2 changes: 1 addition & 1 deletion test/client_ipc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ int main(int, char**)

ipc_client.send_on_shell(std::move(msg));

auto response = ipc_client.check_shell_answer();
auto response = ipc_client.receive_on_shell(false);
if (response.has_value())
{
std::cout << response->content().dump(4) << std::endl;
Expand Down
6 changes: 3 additions & 3 deletions test/xipc_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ namespace xeus
p_client->send_on_shell(std::move(msg));
}

std::optional<xmessage> xipc_client::check_shell_answer()
std::optional<xmessage> xipc_client::receive_on_shell(bool blocking)
{
return p_client->check_shell_answer();
return p_client->receive_on_shell(blocking);
}
}
}
2 changes: 1 addition & 1 deletion test/xipc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace xeus
xipc_client(xcontext& context, const xconfiguration& config);

void send_on_shell(xmessage msg);
std::optional<xmessage> check_shell_answer();
std::optional<xmessage> receive_on_shell(bool blocking = true);

private:
client_ptr p_client;
Expand Down