diff --git a/include/xeus-zmq/xclient_zmq.hpp b/include/xeus-zmq/xclient_zmq.hpp index 2194938..1397c3b 100644 --- a/include/xeus-zmq/xclient_zmq.hpp +++ b/include/xeus-zmq/xclient_zmq.hpp @@ -29,6 +29,7 @@ namespace xeus public: using listener = std::function; + using iopub_listener = std::function; using kernel_status_listener = std::function; explicit xclient_zmq(std::unique_ptr impl); @@ -42,16 +43,16 @@ namespace xeus void register_shell_listener(const listener& l); void register_control_listener(const listener& l); - void register_iopub_listener(const listener& l); + void register_iopub_listener(const iopub_listener& l); void register_kernel_status_listener(const kernel_status_listener& l); void notify_shell_listener(xmessage msg); void notify_control_listener(xmessage msg); - void notify_iopub_listener(xmessage msg); + void notify_iopub_listener(xpub_message msg); void notify_kernel_dead(bool status); std::size_t iopub_queue_size() const; - std::optional pop_iopub_message(); + std::optional pop_iopub_message(); void connect(); void stop_channels(); void start(); diff --git a/src/client/xclient_zmq.cpp b/src/client/xclient_zmq.cpp index 61fa71d..2279d65 100644 --- a/src/client/xclient_zmq.cpp +++ b/src/client/xclient_zmq.cpp @@ -53,7 +53,7 @@ namespace xeus p_client_impl->register_control_listener(l); } - void xclient_zmq::register_iopub_listener(const listener& l) + void xclient_zmq::register_iopub_listener(const iopub_listener& l) { p_client_impl->register_iopub_listener(l); } @@ -73,7 +73,7 @@ namespace xeus p_client_impl->notify_control_listener(std::move(msg)); } - void xclient_zmq::notify_iopub_listener(xmessage msg) + void xclient_zmq::notify_iopub_listener(xpub_message msg) { p_client_impl->notify_iopub_listener(std::move(msg)); } @@ -88,7 +88,7 @@ namespace xeus return p_client_impl->iopub_queue_size(); } - std::optional xclient_zmq::pop_iopub_message() + std::optional xclient_zmq::pop_iopub_message() { return p_client_impl->pop_iopub_message(); } diff --git a/src/client/xclient_zmq_impl.cpp b/src/client/xclient_zmq_impl.cpp index 67a9cc9..23b08df 100644 --- a/src/client/xclient_zmq_impl.cpp +++ b/src/client/xclient_zmq_impl.cpp @@ -89,12 +89,12 @@ namespace xeus return m_iopub_client.iopub_queue_size(); } - std::optional xclient_zmq_impl::pop_iopub_message() + std::optional xclient_zmq_impl::pop_iopub_message() { return m_iopub_client.pop_iopub_message(); } - void xclient_zmq_impl::register_iopub_listener(const listener& l) + void xclient_zmq_impl::register_iopub_listener(const iopub_listener& l) { m_iopub_listener = l; } @@ -124,7 +124,7 @@ namespace xeus m_control_listener(std::move(msg)); } - void xclient_zmq_impl::notify_iopub_listener(xmessage msg) + void xclient_zmq_impl::notify_iopub_listener(xpub_message msg) { m_iopub_listener(std::move(msg)); } @@ -169,7 +169,7 @@ namespace xeus void xclient_zmq_impl::wait_for_message() { - std::optional pending_message = pop_iopub_message(); + std::optional pending_message = pop_iopub_message(); if (pending_message.has_value()) { @@ -200,4 +200,9 @@ namespace xeus return xzmq_serializer::deserialize(wire_msg, *p_auth); } + xpub_message xclient_zmq_impl::deserialize_iopub(zmq::multipart_t& wire_msg) const + { + return xzmq_serializer::deserialize_iopub(wire_msg, *p_auth); + } + } diff --git a/src/client/xclient_zmq_impl.hpp b/src/client/xclient_zmq_impl.hpp index b52aa80..12477b6 100644 --- a/src/client/xclient_zmq_impl.hpp +++ b/src/client/xclient_zmq_impl.hpp @@ -34,6 +34,7 @@ namespace xeus using iopub_client_ptr = std::unique_ptr; using heartbeat_client_ptr = std::unique_ptr; using listener = std::function; + using iopub_listener = std::function; using kernel_status_listener = std::function; xclient_zmq_impl(zmq::context_t& context, @@ -60,8 +61,8 @@ namespace xeus // iopub channel std::size_t iopub_queue_size() const; - std::optional pop_iopub_message(); - void register_iopub_listener(const listener& l); + std::optional pop_iopub_message(); + void register_iopub_listener(const iopub_listener& l); // heartbeat channel void register_kernel_status_listener(const kernel_status_listener& l); @@ -72,13 +73,14 @@ namespace xeus void notify_shell_listener(xmessage msg); void notify_control_listener(xmessage msg); - void notify_iopub_listener(xmessage msg); + void notify_iopub_listener(xpub_message msg); void notify_kernel_dead(bool status); void wait_for_message(); void start(); xmessage deserialize(zmq::multipart_t& wire_msg) const; + xpub_message deserialize_iopub(zmq::multipart_t& wire_msg) const; private: void start_iopub_thread(); @@ -99,7 +101,7 @@ namespace xeus listener m_shell_listener; listener m_control_listener; - listener m_iopub_listener; + iopub_listener m_iopub_listener; iopub_client_ptr p_iopub_client; heartbeat_client_ptr p_heartbeat_client; diff --git a/src/client/xiopub_client.cpp b/src/client/xiopub_client.cpp index 9a8b105..019fa2a 100644 --- a/src/client/xiopub_client.cpp +++ b/src/client/xiopub_client.cpp @@ -36,12 +36,12 @@ namespace xeus return m_message_queue.size(); } - std::optional xiopub_client::pop_iopub_message() + std::optional xiopub_client::pop_iopub_message() { std::lock_guard guard(m_queue_mutex); if (!m_message_queue.empty()) { - xmessage msg = std::move(m_message_queue.back()); + xpub_message msg = std::move(m_message_queue.back()); m_message_queue.pop(); return msg; } else { @@ -51,32 +51,42 @@ namespace xeus void xiopub_client::run() { + zmq::multipart_t wire_msg; zmq::pollitem_t items[] = { - { m_iopub, 0, ZMQ_POLLIN, 0 } + { m_iopub, 0, ZMQ_POLLIN, 0 }, { m_controller, 0, ZMQ_POLLIN, 0 } }; while (true) { - zmq::poll(&items[0], 1, std::chrono::milliseconds(-1)); - - if (items[0].revents & ZMQ_POLLIN) + zmq::poll(&items[0], 2, std::chrono::milliseconds(-1)); + try { - zmq::multipart_t wire_msg; - wire_msg.recv(m_iopub); - try + if (items[0].revents & ZMQ_POLLIN) { - xmessage msg = p_client_impl->deserialize(wire_msg); + wire_msg.recv(m_iopub); + xpub_message msg = p_client_impl->deserialize_iopub(wire_msg); { std::lock_guard guard(m_queue_mutex); m_message_queue.push(std::move(msg)); } - p_client_impl->notify_shell_listener(std::move(msg)); } - catch(std::exception& e) + if (items[1].revents & ZMQ_POLLIN) { - std::cerr << e.what() << std::endl; + wire_msg.recv(m_controller); + if (wire_msg.size() > 0) + { + std::string received_msg = wire_msg.at(0).to_string(); + if (received_msg == "stop") + { + break; + } + } } } + catch (std::exception& e) + { + std::cerr << e.what() << std::endl; + } } } } diff --git a/src/client/xiopub_client.hpp b/src/client/xiopub_client.hpp index 79689fb..f1d149a 100644 --- a/src/client/xiopub_client.hpp +++ b/src/client/xiopub_client.hpp @@ -32,7 +32,7 @@ namespace xeus ~xiopub_client(); std::size_t iopub_queue_size() const; - std::optional pop_iopub_message(); + std::optional pop_iopub_message(); void run(); @@ -40,7 +40,7 @@ namespace xeus zmq::socket_t m_iopub; zmq::socket_t m_controller; - std::queue m_message_queue; + std::queue m_message_queue; mutable std::mutex m_queue_mutex; xclient_zmq_impl* p_client_impl;