Skip to content

Commit

Permalink
Merge pull request #64 from anutosh491/fix_iopub_client
Browse files Browse the repository at this point in the history
Fix run function for xiopub_client
  • Loading branch information
JohanMabille authored Jun 12, 2024
2 parents 63154b4 + 9d377f9 commit 310c30e
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 29 deletions.
7 changes: 4 additions & 3 deletions include/xeus-zmq/xclient_zmq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace xeus
public:

using listener = std::function<void(xmessage)>;
using iopub_listener = std::function<void(xpub_message)>;
using kernel_status_listener = std::function<void(bool)>;

explicit xclient_zmq(std::unique_ptr<xclient_zmq_impl> impl);
Expand All @@ -42,16 +43,16 @@ namespace xeus

void register_shell_listener(const listener& l);
void register_control_listener(const listener& l);
void register_iopub_listener(const listener& l);
void register_iopub_listener(const iopub_listener& l);
void register_kernel_status_listener(const kernel_status_listener& l);

void notify_shell_listener(xmessage msg);
void notify_control_listener(xmessage msg);
void notify_iopub_listener(xmessage msg);
void notify_iopub_listener(xpub_message msg);
void notify_kernel_dead(bool status);

std::size_t iopub_queue_size() const;
std::optional<xmessage> pop_iopub_message();
std::optional<xpub_message> pop_iopub_message();
void connect();
void stop_channels();
void start();
Expand Down
6 changes: 3 additions & 3 deletions src/client/xclient_zmq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace xeus
p_client_impl->register_control_listener(l);
}

void xclient_zmq::register_iopub_listener(const listener& l)
void xclient_zmq::register_iopub_listener(const iopub_listener& l)
{
p_client_impl->register_iopub_listener(l);
}
Expand All @@ -73,7 +73,7 @@ namespace xeus
p_client_impl->notify_control_listener(std::move(msg));
}

void xclient_zmq::notify_iopub_listener(xmessage msg)
void xclient_zmq::notify_iopub_listener(xpub_message msg)
{
p_client_impl->notify_iopub_listener(std::move(msg));
}
Expand All @@ -88,7 +88,7 @@ namespace xeus
return p_client_impl->iopub_queue_size();
}

std::optional<xmessage> xclient_zmq::pop_iopub_message()
std::optional<xpub_message> xclient_zmq::pop_iopub_message()
{
return p_client_impl->pop_iopub_message();
}
Expand Down
13 changes: 9 additions & 4 deletions src/client/xclient_zmq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ namespace xeus
return m_iopub_client.iopub_queue_size();
}

std::optional<xmessage> xclient_zmq_impl::pop_iopub_message()
std::optional<xpub_message> xclient_zmq_impl::pop_iopub_message()
{
return m_iopub_client.pop_iopub_message();
}

void xclient_zmq_impl::register_iopub_listener(const listener& l)
void xclient_zmq_impl::register_iopub_listener(const iopub_listener& l)
{
m_iopub_listener = l;
}
Expand Down Expand Up @@ -124,7 +124,7 @@ namespace xeus
m_control_listener(std::move(msg));
}

void xclient_zmq_impl::notify_iopub_listener(xmessage msg)
void xclient_zmq_impl::notify_iopub_listener(xpub_message msg)
{
m_iopub_listener(std::move(msg));
}
Expand Down Expand Up @@ -169,7 +169,7 @@ namespace xeus

void xclient_zmq_impl::wait_for_message()
{
std::optional<xmessage> pending_message = pop_iopub_message();
std::optional<xpub_message> pending_message = pop_iopub_message();

if (pending_message.has_value())
{
Expand Down Expand Up @@ -200,4 +200,9 @@ namespace xeus
return xzmq_serializer::deserialize(wire_msg, *p_auth);
}

xpub_message xclient_zmq_impl::deserialize_iopub(zmq::multipart_t& wire_msg) const
{
return xzmq_serializer::deserialize_iopub(wire_msg, *p_auth);
}

}
10 changes: 6 additions & 4 deletions src/client/xclient_zmq_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace xeus
using iopub_client_ptr = std::unique_ptr<xiopub_client>;
using heartbeat_client_ptr = std::unique_ptr<xheartbeat_client>;
using listener = std::function<void(xmessage)>;
using iopub_listener = std::function<void(xpub_message)>;
using kernel_status_listener = std::function<void(bool)>;

xclient_zmq_impl(zmq::context_t& context,
Expand All @@ -60,8 +61,8 @@ namespace xeus

// iopub channel
std::size_t iopub_queue_size() const;
std::optional<xmessage> pop_iopub_message();
void register_iopub_listener(const listener& l);
std::optional<xpub_message> pop_iopub_message();
void register_iopub_listener(const iopub_listener& l);

// heartbeat channel
void register_kernel_status_listener(const kernel_status_listener& l);
Expand All @@ -72,13 +73,14 @@ namespace xeus

void notify_shell_listener(xmessage msg);
void notify_control_listener(xmessage msg);
void notify_iopub_listener(xmessage msg);
void notify_iopub_listener(xpub_message msg);
void notify_kernel_dead(bool status);

void wait_for_message();
void start();

xmessage deserialize(zmq::multipart_t& wire_msg) const;
xpub_message deserialize_iopub(zmq::multipart_t& wire_msg) const;

private:
void start_iopub_thread();
Expand All @@ -99,7 +101,7 @@ namespace xeus

listener m_shell_listener;
listener m_control_listener;
listener m_iopub_listener;
iopub_listener m_iopub_listener;

iopub_client_ptr p_iopub_client;
heartbeat_client_ptr p_heartbeat_client;
Expand Down
36 changes: 23 additions & 13 deletions src/client/xiopub_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ namespace xeus
return m_message_queue.size();
}

std::optional<xmessage> xiopub_client::pop_iopub_message()
std::optional<xpub_message> xiopub_client::pop_iopub_message()
{
std::lock_guard<std::mutex> guard(m_queue_mutex);
if (!m_message_queue.empty())
{
xmessage msg = std::move(m_message_queue.back());
xpub_message msg = std::move(m_message_queue.back());
m_message_queue.pop();
return msg;
} else {
Expand All @@ -51,32 +51,42 @@ namespace xeus

void xiopub_client::run()
{
zmq::multipart_t wire_msg;
zmq::pollitem_t items[] = {
{ m_iopub, 0, ZMQ_POLLIN, 0 }
{ m_iopub, 0, ZMQ_POLLIN, 0 }, { m_controller, 0, ZMQ_POLLIN, 0 }
};

while (true)
{
zmq::poll(&items[0], 1, std::chrono::milliseconds(-1));

if (items[0].revents & ZMQ_POLLIN)
zmq::poll(&items[0], 2, std::chrono::milliseconds(-1));
try
{
zmq::multipart_t wire_msg;
wire_msg.recv(m_iopub);
try
if (items[0].revents & ZMQ_POLLIN)
{
xmessage msg = p_client_impl->deserialize(wire_msg);
wire_msg.recv(m_iopub);
xpub_message msg = p_client_impl->deserialize_iopub(wire_msg);
{
std::lock_guard<std::mutex> guard(m_queue_mutex);
m_message_queue.push(std::move(msg));
}
p_client_impl->notify_shell_listener(std::move(msg));
}
catch(std::exception& e)
if (items[1].revents & ZMQ_POLLIN)
{
std::cerr << e.what() << std::endl;
wire_msg.recv(m_controller);
if (wire_msg.size() > 0)
{
std::string received_msg = wire_msg.at(0).to_string();
if (received_msg == "stop")
{
break;
}
}
}
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
}
}
}
4 changes: 2 additions & 2 deletions src/client/xiopub_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ namespace xeus
~xiopub_client();

std::size_t iopub_queue_size() const;
std::optional<xmessage> pop_iopub_message();
std::optional<xpub_message> pop_iopub_message();

void run();

private:
zmq::socket_t m_iopub;
zmq::socket_t m_controller;

std::queue<xmessage> m_message_queue;
std::queue<xpub_message> m_message_queue;
mutable std::mutex m_queue_mutex;

xclient_zmq_impl* p_client_impl;
Expand Down

0 comments on commit 310c30e

Please sign in to comment.