Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of jep #65 #31

Merged
merged 5 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions include/xeus-zmq/xserver_zmq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@
#define XEUS_SERVER_IMPL_HPP

#include "zmq.hpp"
#include "zmq_addon.hpp"

#include "xeus/xeus_context.hpp"
#include "xeus/xkernel_configuration.hpp"
#include "xeus/xserver.hpp"

#include "xeus-zmq.hpp"
#include "xauthentication.hpp"
#include "xthread.hpp"

namespace xeus
{
class xauthentication;
class xpublisher;
class xheartbeat;
class xtrivial_messenger;
Expand All @@ -39,6 +40,8 @@ namespace xeus

~xserver_zmq() override;

zmq::multipart_t serialize_iopub(xpub_message&& msg);

using xserver::notify_internal_listener;

protected:
Expand Down Expand Up @@ -67,6 +70,9 @@ namespace xeus
zmq::socket_t m_publisher_controller;
zmq::socket_t m_heartbeat_controller;

using authentication_ptr = std::unique_ptr<xauthentication>;
authentication_ptr p_auth;

publisher_ptr p_publisher;
heartbeat_ptr p_heartbeat;

Expand All @@ -76,8 +82,6 @@ namespace xeus
using trivial_messenger_ptr = std::unique_ptr<xtrivial_messenger>;
trivial_messenger_ptr p_messenger;

using authentication_ptr = std::unique_ptr<xauthentication>;
authentication_ptr p_auth;
nl::json::error_handler_t m_error_handler;

bool m_request_stop;
Expand Down
9 changes: 6 additions & 3 deletions include/xeus-zmq/xserver_zmq_split.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
#include "xeus/xkernel_configuration.hpp"

#include "xeus-zmq.hpp"
#include "xauthentication.hpp"
#include "xthread.hpp"

namespace xeus
{
class xauthentication;
class xcontrol;
class xheartbeat;
class xpublisher;
Expand Down Expand Up @@ -54,6 +54,8 @@ namespace xeus

xmessage deserialize(zmq::multipart_t& wire_msg) const;

zmq::multipart_t serialize_iopub(xpub_message&& msg);

protected:

xcontrol_messenger& get_control_messenger_impl() override;
Expand Down Expand Up @@ -82,6 +84,9 @@ namespace xeus

virtual void start_server(zmq::multipart_t& wire_msg) = 0;

using authentication_ptr = std::unique_ptr<xauthentication>;
authentication_ptr p_auth;

controller_ptr p_controller;
heartbeat_ptr p_heartbeat;
publisher_ptr p_publisher;
Expand All @@ -92,8 +97,6 @@ namespace xeus
xthread m_iopub_thread;
xthread m_shell_thread;

using authentication_ptr = std::unique_ptr<xauthentication>;
authentication_ptr p_auth;
nl::json::error_handler_t m_error_handler;

std::atomic<bool> m_control_stopped;
Expand Down
61 changes: 56 additions & 5 deletions src/xpublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,27 @@
* The full license is in the file LICENSE, distributed with this software. *
****************************************************************************/

#include <string>
#include <iostream>
#include <string>

#include "zmq_addon.hpp"
#include "xeus-zmq/xmiddleware.hpp"
#include "xpublisher.hpp"

namespace xeus
{
xpublisher::xpublisher(zmq::context_t& context,
std::function<zmq::multipart_t(xpub_message&&)> serialize_iopub_msg_cb,
const std::string& transport,
const std::string& ip,
const std::string& port)
: m_publisher(context, zmq::socket_type::pub)
: m_publisher(context, zmq::socket_type::xpub)
, m_listener(context, zmq::socket_type::sub)
, m_controller(context, zmq::socket_type::rep)
, m_serialize_iopub_msg_cb(std::move(serialize_iopub_msg_cb))
{
init_socket(m_publisher, transport, ip, port);
// Set xpub_verbose option to 1 to pass all subscription messages (not only unique ones).
m_publisher.set(zmq::sockopt::xpub_verbose, 1);
m_listener.set(zmq::sockopt::subscribe, "");
m_listener.bind(get_publisher_end_point());
m_controller.set(zmq::sockopt::linger, get_socket_linger());
Expand All @@ -35,6 +38,16 @@ namespace xeus
{
}

xpub_message xpublisher::create_xpub_message(const std::string& topic)
{
xmessage_base_data data;
data.m_header = xeus::make_header("iopub_welcome", "", "");
data.m_content["subscription"] = topic;
xpub_message p_msg("", std::move(data));

return p_msg;
}

std::string xpublisher::get_port() const
{
return get_socket_port(m_publisher);
Expand All @@ -44,12 +57,13 @@ namespace xeus
{
zmq::pollitem_t items[] = {
{ m_listener, 0, ZMQ_POLLIN, 0 },
{ m_controller, 0, ZMQ_POLLIN, 0 }
{ m_controller, 0, ZMQ_POLLIN, 0 },
{ m_publisher, 0, ZMQ_POLLIN, 0 }
};

while (true)
{
zmq::poll(&items[0], 2, std::chrono::milliseconds(-1));
zmq::poll(&items[0], 3, std::chrono::milliseconds(-1));

if (items[0].revents & ZMQ_POLLIN)
{
Expand All @@ -66,6 +80,43 @@ namespace xeus
wire_msg.send(m_controller);
break;
}

if (items[2].revents & ZMQ_POLLIN)
{
// Received event: Single frame
// Either `1{subscription-topic}` for subscription
// or `0{subscription-topic}` for unsubscription
zmq::multipart_t wire_msg;
wire_msg.recv(m_publisher);

// Received event should be a single frame
if (wire_msg.size() != 1)
{
throw std::runtime_error("ERROR: Received message on XPUB is not a single frame");
}

zmq::message_t frame = wire_msg.pop();

// Event is one byte 0 = unsub or 1 = sub, followed by topic
uint8_t *event = (uint8_t *)frame.data();
// If subscription (unsubscription is ignored)
if (event[0] == 1)
{
std::string topic((char *)(event + 1), frame.size() - 1);
if (m_serialize_iopub_msg_cb)
{
// Construct the `iopub_welcome` message
xpub_message p_msg = create_xpub_message(topic);
zmq::multipart_t iopub_welcome_wire_msg = m_serialize_iopub_msg_cb(std::move(p_msg));
// Send the `iopub_welcome` message
iopub_welcome_wire_msg.send(m_publisher);
}
else
{
throw std::runtime_error("ERROR: IOPUB serialization callback not set");
}
}
}
}
}
}
9 changes: 9 additions & 0 deletions src/xpublisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@
#ifndef XEUS_PUBLISHER_HPP
#define XEUS_PUBLISHER_HPP

#include <functional>
#include <string>

#include "zmq.hpp"
#include "zmq_addon.hpp"

#include "xeus/xmessage.hpp"

namespace xeus
{
Expand All @@ -21,6 +25,7 @@ namespace xeus
public:

xpublisher(zmq::context_t& context,
std::function<zmq::multipart_t(xpub_message&&)> serialize_iopub_msg_cb,
const std::string& transport,
const std::string& ip,
const std::string& port);
Expand All @@ -33,9 +38,13 @@ namespace xeus

private:

xpub_message create_xpub_message(const std::string& topic);

zmq::socket_t m_publisher;
zmq::socket_t m_listener;
zmq::socket_t m_controller;

std::function<zmq::multipart_t(xpub_message&&)> m_serialize_iopub_msg_cb;
};
}

Expand Down
13 changes: 10 additions & 3 deletions src/xserver_zmq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
#include <chrono>
#include <iostream>

#include "zmq_addon.hpp"
#include "xeus/xguid.hpp"
#include "xeus-zmq/xauthentication.hpp"
#include "xeus-zmq/xserver_zmq.hpp"
#include "xeus-zmq/xmiddleware.hpp"
#include "xeus-zmq/xzmq_serializer.hpp"
Expand All @@ -31,12 +31,14 @@ namespace xeus
, m_publisher_pub(context, zmq::socket_type::pub)
, m_publisher_controller(context, zmq::socket_type::req)
, m_heartbeat_controller(context, zmq::socket_type::req)
, p_publisher(new xpublisher(context, config.m_transport, config.m_ip, config.m_iopub_port))
, p_auth(make_xauthentication(config.m_signature_scheme, config.m_key))
, p_publisher(new xpublisher(context,
std::bind(&xserver_zmq::serialize_iopub, this, std::placeholders::_1),
config.m_transport, config.m_ip, config.m_iopub_port))
, p_heartbeat(new xheartbeat(context, config.m_transport, config.m_ip, config.m_hb_port))
, m_iopub_thread()
, m_hb_thread()
, p_messenger(new xtrivial_messenger(this))
, p_auth(make_xauthentication(config.m_signature_scheme, config.m_key))
, m_error_handler(eh)
, m_request_stop(false)
{
Expand Down Expand Up @@ -207,6 +209,11 @@ namespace xeus
(void)m_heartbeat_controller.recv(response);
}

zmq::multipart_t xserver_zmq::serialize_iopub(xpub_message&& msg)
{
return xzmq_serializer::serialize_iopub(std::move(msg), *p_auth, m_error_handler);
}

std::unique_ptr<xserver> make_xserver_zmq(xcontext& context,
const xconfiguration& config,
nl::json::error_handler_t eh)
Expand Down
14 changes: 11 additions & 3 deletions src/xserver_zmq_split.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "zmq_addon.hpp"
#include "xeus/xguid.hpp"
#include "xeus-zmq/xauthentication.hpp"
#include "xeus-zmq/xserver_zmq_split.hpp"
#include "xeus-zmq/xmiddleware.hpp"
#include "xeus-zmq/xzmq_serializer.hpp"
Expand All @@ -26,15 +27,17 @@ namespace xeus
xserver_zmq_split::xserver_zmq_split(zmq::context_t& context,
const xconfiguration& config,
nl::json::error_handler_t eh)
: p_controller(new xcontrol(context, config.m_transport, config.m_ip ,config.m_control_port, this))
: p_auth(make_xauthentication(config.m_signature_scheme, config.m_key))
, p_controller(new xcontrol(context, config.m_transport, config.m_ip ,config.m_control_port, this))
, p_heartbeat(new xheartbeat(context, config.m_transport, config.m_ip, config.m_hb_port))
, p_publisher(new xpublisher(context, config.m_transport, config.m_ip, config.m_iopub_port))
, p_publisher(new xpublisher(context,
std::bind(&xserver_zmq_split::serialize_iopub, this, std::placeholders::_1),
config.m_transport, config.m_ip, config.m_iopub_port))
, p_shell(new xshell(context, config.m_transport, config.m_ip ,config.m_shell_port, config.m_stdin_port, this))
, m_control_thread()
, m_hb_thread()
, m_iopub_thread()
, m_shell_thread()
, p_auth(make_xauthentication(config.m_signature_scheme, config.m_key))
, m_error_handler(eh)
, m_control_stopped(false)
{
Expand Down Expand Up @@ -157,5 +160,10 @@ namespace xeus
{
return m_control_stopped;
}

zmq::multipart_t xserver_zmq_split::serialize_iopub(xpub_message&& msg)
{
return xzmq_serializer::serialize_iopub(std::move(msg), *p_auth, m_error_handler);
}
}

4 changes: 4 additions & 0 deletions test/test_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ def test_xeus_stderr(self):
self.assertEqual(output_msgs[0]['content']['name'], 'stderr')
self.assertEqual(output_msgs[0]['content']['text'], 'error')

class XeusIopubWelcomeTests(jupyter_kernel_test.IopubWelcomeTests):

kernel_name = "test_kernel"
support_iopub_welcome = True

if __name__ == '__main__':
unittest.main()
4 changes: 4 additions & 0 deletions test/test_kernel_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ def test_xeus_stderr(self):
self.assertEqual(output_msgs[0]['content']['name'], 'stderr')
self.assertEqual(output_msgs[0]['content']['text'], 'error')

class XeusIopubWelcomeTests(jupyter_kernel_test.IopubWelcomeTests):

kernel_name = "test_kernel_control"
support_iopub_welcome = True

if __name__ == '__main__':
unittest.main()
4 changes: 4 additions & 0 deletions test/test_kernel_shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ def test_xeus_stderr(self):
self.assertEqual(output_msgs[0]['content']['name'], 'stderr')
self.assertEqual(output_msgs[0]['content']['text'], 'error')

class XeusIopubWelcomeTests(jupyter_kernel_test.IopubWelcomeTests):

kernel_name = "test_kernel_shell"
support_iopub_welcome = True

if __name__ == '__main__':
unittest.main()
Loading