Skip to content

Commit

Permalink
Merge pull request #31 from Hind-M/xpub
Browse files Browse the repository at this point in the history
Implementation of jep #65
  • Loading branch information
JohanMabille authored Sep 5, 2023
2 parents 952119f + 0fb77fe commit eeed108
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 18 deletions.
2 changes: 1 addition & 1 deletion environment-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ dependencies:
# Test dependencies
- doctest >= 2.4.6
- pytest
- jupyter_kernel_test>=0.5,<0.6
- jupyter_kernel_test>=0.6,<0.7
10 changes: 7 additions & 3 deletions include/xeus-zmq/xserver_zmq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@
#define XEUS_SERVER_IMPL_HPP

#include "zmq.hpp"
#include "zmq_addon.hpp"

#include "xeus/xeus_context.hpp"
#include "xeus/xkernel_configuration.hpp"
#include "xeus/xserver.hpp"

#include "xeus-zmq.hpp"
#include "xauthentication.hpp"
#include "xthread.hpp"

namespace xeus
{
class xauthentication;
class xpublisher;
class xheartbeat;
class xtrivial_messenger;
Expand All @@ -39,6 +40,8 @@ namespace xeus

~xserver_zmq() override;

zmq::multipart_t serialize_iopub(xpub_message&& msg);

using xserver::notify_internal_listener;

protected:
Expand Down Expand Up @@ -67,6 +70,9 @@ namespace xeus
zmq::socket_t m_publisher_controller;
zmq::socket_t m_heartbeat_controller;

using authentication_ptr = std::unique_ptr<xauthentication>;
authentication_ptr p_auth;

publisher_ptr p_publisher;
heartbeat_ptr p_heartbeat;

Expand All @@ -76,8 +82,6 @@ namespace xeus
using trivial_messenger_ptr = std::unique_ptr<xtrivial_messenger>;
trivial_messenger_ptr p_messenger;

using authentication_ptr = std::unique_ptr<xauthentication>;
authentication_ptr p_auth;
nl::json::error_handler_t m_error_handler;

bool m_request_stop;
Expand Down
9 changes: 6 additions & 3 deletions include/xeus-zmq/xserver_zmq_split.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
#include "xeus/xkernel_configuration.hpp"

#include "xeus-zmq.hpp"
#include "xauthentication.hpp"
#include "xthread.hpp"

namespace xeus
{
class xauthentication;
class xcontrol;
class xheartbeat;
class xpublisher;
Expand Down Expand Up @@ -54,6 +54,8 @@ namespace xeus

xmessage deserialize(zmq::multipart_t& wire_msg) const;

zmq::multipart_t serialize_iopub(xpub_message&& msg);

protected:

xcontrol_messenger& get_control_messenger_impl() override;
Expand Down Expand Up @@ -82,6 +84,9 @@ namespace xeus

virtual void start_server(zmq::multipart_t& wire_msg) = 0;

using authentication_ptr = std::unique_ptr<xauthentication>;
authentication_ptr p_auth;

controller_ptr p_controller;
heartbeat_ptr p_heartbeat;
publisher_ptr p_publisher;
Expand All @@ -92,8 +97,6 @@ namespace xeus
xthread m_iopub_thread;
xthread m_shell_thread;

using authentication_ptr = std::unique_ptr<xauthentication>;
authentication_ptr p_auth;
nl::json::error_handler_t m_error_handler;

std::atomic<bool> m_control_stopped;
Expand Down
61 changes: 56 additions & 5 deletions src/xpublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,27 @@
* The full license is in the file LICENSE, distributed with this software. *
****************************************************************************/

#include <string>
#include <iostream>
#include <string>

#include "zmq_addon.hpp"
#include "xeus-zmq/xmiddleware.hpp"
#include "xpublisher.hpp"

namespace xeus
{
xpublisher::xpublisher(zmq::context_t& context,
std::function<zmq::multipart_t(xpub_message&&)> serialize_iopub_msg_cb,
const std::string& transport,
const std::string& ip,
const std::string& port)
: m_publisher(context, zmq::socket_type::pub)
: m_publisher(context, zmq::socket_type::xpub)
, m_listener(context, zmq::socket_type::sub)
, m_controller(context, zmq::socket_type::rep)
, m_serialize_iopub_msg_cb(std::move(serialize_iopub_msg_cb))
{
init_socket(m_publisher, transport, ip, port);
// Set xpub_verbose option to 1 to pass all subscription messages (not only unique ones).
m_publisher.set(zmq::sockopt::xpub_verbose, 1);
m_listener.set(zmq::sockopt::subscribe, "");
m_listener.bind(get_publisher_end_point());
m_controller.set(zmq::sockopt::linger, get_socket_linger());
Expand All @@ -35,6 +38,16 @@ namespace xeus
{
}

xpub_message xpublisher::create_xpub_message(const std::string& topic)
{
xmessage_base_data data;
data.m_header = xeus::make_header("iopub_welcome", "", "");
data.m_content["subscription"] = topic;
xpub_message p_msg("", std::move(data));

return p_msg;
}

std::string xpublisher::get_port() const
{
return get_socket_port(m_publisher);
Expand All @@ -44,12 +57,13 @@ namespace xeus
{
zmq::pollitem_t items[] = {
{ m_listener, 0, ZMQ_POLLIN, 0 },
{ m_controller, 0, ZMQ_POLLIN, 0 }
{ m_controller, 0, ZMQ_POLLIN, 0 },
{ m_publisher, 0, ZMQ_POLLIN, 0 }
};

while (true)
{
zmq::poll(&items[0], 2, std::chrono::milliseconds(-1));
zmq::poll(&items[0], 3, std::chrono::milliseconds(-1));

if (items[0].revents & ZMQ_POLLIN)
{
Expand All @@ -66,6 +80,43 @@ namespace xeus
wire_msg.send(m_controller);
break;
}

if (items[2].revents & ZMQ_POLLIN)
{
// Received event: Single frame
// Either `1{subscription-topic}` for subscription
// or `0{subscription-topic}` for unsubscription
zmq::multipart_t wire_msg;
wire_msg.recv(m_publisher);

// Received event should be a single frame
if (wire_msg.size() != 1)
{
throw std::runtime_error("ERROR: Received message on XPUB is not a single frame");
}

zmq::message_t frame = wire_msg.pop();

// Event is one byte 0 = unsub or 1 = sub, followed by topic
uint8_t *event = (uint8_t *)frame.data();
// If subscription (unsubscription is ignored)
if (event[0] == 1)
{
std::string topic((char *)(event + 1), frame.size() - 1);
if (m_serialize_iopub_msg_cb)
{
// Construct the `iopub_welcome` message
xpub_message p_msg = create_xpub_message(topic);
zmq::multipart_t iopub_welcome_wire_msg = m_serialize_iopub_msg_cb(std::move(p_msg));
// Send the `iopub_welcome` message
iopub_welcome_wire_msg.send(m_publisher);
}
else
{
throw std::runtime_error("ERROR: IOPUB serialization callback not set");
}
}
}
}
}
}
9 changes: 9 additions & 0 deletions src/xpublisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@
#ifndef XEUS_PUBLISHER_HPP
#define XEUS_PUBLISHER_HPP

#include <functional>
#include <string>

#include "zmq.hpp"
#include "zmq_addon.hpp"

#include "xeus/xmessage.hpp"

namespace xeus
{
Expand All @@ -21,6 +25,7 @@ namespace xeus
public:

xpublisher(zmq::context_t& context,
std::function<zmq::multipart_t(xpub_message&&)> serialize_iopub_msg_cb,
const std::string& transport,
const std::string& ip,
const std::string& port);
Expand All @@ -33,9 +38,13 @@ namespace xeus

private:

xpub_message create_xpub_message(const std::string& topic);

zmq::socket_t m_publisher;
zmq::socket_t m_listener;
zmq::socket_t m_controller;

std::function<zmq::multipart_t(xpub_message&&)> m_serialize_iopub_msg_cb;
};
}

Expand Down
13 changes: 10 additions & 3 deletions src/xserver_zmq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
#include <chrono>
#include <iostream>

#include "zmq_addon.hpp"
#include "xeus/xguid.hpp"
#include "xeus-zmq/xauthentication.hpp"
#include "xeus-zmq/xserver_zmq.hpp"
#include "xeus-zmq/xmiddleware.hpp"
#include "xeus-zmq/xzmq_serializer.hpp"
Expand All @@ -31,12 +31,14 @@ namespace xeus
, m_publisher_pub(context, zmq::socket_type::pub)
, m_publisher_controller(context, zmq::socket_type::req)
, m_heartbeat_controller(context, zmq::socket_type::req)
, p_publisher(new xpublisher(context, config.m_transport, config.m_ip, config.m_iopub_port))
, p_auth(make_xauthentication(config.m_signature_scheme, config.m_key))
, p_publisher(new xpublisher(context,
std::bind(&xserver_zmq::serialize_iopub, this, std::placeholders::_1),
config.m_transport, config.m_ip, config.m_iopub_port))
, p_heartbeat(new xheartbeat(context, config.m_transport, config.m_ip, config.m_hb_port))
, m_iopub_thread()
, m_hb_thread()
, p_messenger(new xtrivial_messenger(this))
, p_auth(make_xauthentication(config.m_signature_scheme, config.m_key))
, m_error_handler(eh)
, m_request_stop(false)
{
Expand Down Expand Up @@ -207,6 +209,11 @@ namespace xeus
(void)m_heartbeat_controller.recv(response);
}

zmq::multipart_t xserver_zmq::serialize_iopub(xpub_message&& msg)
{
return xzmq_serializer::serialize_iopub(std::move(msg), *p_auth, m_error_handler);
}

std::unique_ptr<xserver> make_xserver_zmq(xcontext& context,
const xconfiguration& config,
nl::json::error_handler_t eh)
Expand Down
14 changes: 11 additions & 3 deletions src/xserver_zmq_split.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "zmq_addon.hpp"
#include "xeus/xguid.hpp"
#include "xeus-zmq/xauthentication.hpp"
#include "xeus-zmq/xserver_zmq_split.hpp"
#include "xeus-zmq/xmiddleware.hpp"
#include "xeus-zmq/xzmq_serializer.hpp"
Expand All @@ -26,15 +27,17 @@ namespace xeus
xserver_zmq_split::xserver_zmq_split(zmq::context_t& context,
const xconfiguration& config,
nl::json::error_handler_t eh)
: p_controller(new xcontrol(context, config.m_transport, config.m_ip ,config.m_control_port, this))
: p_auth(make_xauthentication(config.m_signature_scheme, config.m_key))
, p_controller(new xcontrol(context, config.m_transport, config.m_ip ,config.m_control_port, this))
, p_heartbeat(new xheartbeat(context, config.m_transport, config.m_ip, config.m_hb_port))
, p_publisher(new xpublisher(context, config.m_transport, config.m_ip, config.m_iopub_port))
, p_publisher(new xpublisher(context,
std::bind(&xserver_zmq_split::serialize_iopub, this, std::placeholders::_1),
config.m_transport, config.m_ip, config.m_iopub_port))
, p_shell(new xshell(context, config.m_transport, config.m_ip ,config.m_shell_port, config.m_stdin_port, this))
, m_control_thread()
, m_hb_thread()
, m_iopub_thread()
, m_shell_thread()
, p_auth(make_xauthentication(config.m_signature_scheme, config.m_key))
, m_error_handler(eh)
, m_control_stopped(false)
{
Expand Down Expand Up @@ -157,5 +160,10 @@ namespace xeus
{
return m_control_stopped;
}

zmq::multipart_t xserver_zmq_split::serialize_iopub(xpub_message&& msg)
{
return xzmq_serializer::serialize_iopub(std::move(msg), *p_auth, m_error_handler);
}
}

4 changes: 4 additions & 0 deletions test/test_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ def test_xeus_stderr(self):
self.assertEqual(output_msgs[0]['content']['name'], 'stderr')
self.assertEqual(output_msgs[0]['content']['text'], 'error')

class XeusIopubWelcomeTests(jupyter_kernel_test.IopubWelcomeTests):

kernel_name = "test_kernel"
support_iopub_welcome = True

if __name__ == '__main__':
unittest.main()
4 changes: 4 additions & 0 deletions test/test_kernel_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ def test_xeus_stderr(self):
self.assertEqual(output_msgs[0]['content']['name'], 'stderr')
self.assertEqual(output_msgs[0]['content']['text'], 'error')

class XeusIopubWelcomeTests(jupyter_kernel_test.IopubWelcomeTests):

kernel_name = "test_kernel_control"
support_iopub_welcome = True

if __name__ == '__main__':
unittest.main()
4 changes: 4 additions & 0 deletions test/test_kernel_shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ def test_xeus_stderr(self):
self.assertEqual(output_msgs[0]['content']['name'], 'stderr')
self.assertEqual(output_msgs[0]['content']['text'], 'error')

class XeusIopubWelcomeTests(jupyter_kernel_test.IopubWelcomeTests):

kernel_name = "test_kernel_shell"
support_iopub_welcome = True

if __name__ == '__main__':
unittest.main()

0 comments on commit eeed108

Please sign in to comment.