From 6d235b8508f655661ee91c4efd2a96a3bbdd27be Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Mon, 24 Jul 2023 18:17:09 +0200 Subject: [PATCH 1/2] Handle Iopub welcome message for jep#65 --- .../services/kernels/connection/channels.py | 75 ++++++++++++++++++- 1 file changed, 73 insertions(+), 2 deletions(-) diff --git a/jupyter_server/services/kernels/connection/channels.py b/jupyter_server/services/kernels/connection/channels.py index 212954f301..2c67ec7121 100644 --- a/jupyter_server/services/kernels/connection/channels.py +++ b/jupyter_server/services/kernels/connection/channels.py @@ -151,6 +151,20 @@ def create_stream(self): self.channels[channel] = stream = meth(identity=identity) stream.channel = channel + def _is_iopub_welcome_supported(self): + """Check if the messaging protocol supports sending + Iopub welcome messages (i.e protocol_version >= 5.4) + """ + MIN_MAJOR_VERSION = 5 + MIN_MINOR_VERSION = 4 + protocol_version_parts = client_protocol_version.split(".") + if int(protocol_version_parts[0]) > MIN_MAJOR_VERSION: + return True + return bool( + int(protocol_version_parts[0]) == MIN_MAJOR_VERSION + and int(protocol_version_parts[1]) >= MIN_MINOR_VERSION + ) + def nudge(self): # noqa """Nudge the zmq connections with kernel_info_requests Returns a Future that will resolve when we have received @@ -268,6 +282,63 @@ def nudge(count): future.add_done_callback(finish) return _ensure_future(future) + def wait_for_iopub_welcome(self): + """Wait for an iopub welcome message + Since protocol_version >= 5.4 + """ + iopub_channel = self.channels["iopub"] + iopub_future: Future = Future() + + def on_iopub(msg): + """Handle iopub welcome message""" + idents, msg = self.session.feed_identities(msg) + try: + msg = self.session.deserialize(msg) + except BaseException: + self.log.error("Bad Iopub message", exc_info=True) + iopub_channel.stop_on_recv() + iopub_future.set_result(None) + else: + msg_type = msg["header"]["msg_type"] + if msg_type == "iopub_welcome": + self.log.debug("Iopub welcome message received: %s", self.kernel_id) + iopub_channel.stop_on_recv() + iopub_future.set_result(msg) + else: + self.log.error( + "Iopub welcome message not received, receiving %s instead", msg_type + ) + iopub_channel.stop_on_recv() + iopub_future.set_result(None) + + iopub_channel.on_recv(on_iopub) + loop = IOLoop.current() + + def give_up(): + """Don't wait forever for the kernel""" + if iopub_future.done(): + return + self.log.error("Timeout waiting for IOPub welcome message from %s", self.kernel_id) + iopub_channel.stop_on_recv() + iopub_future.set_result(None) + + # Resolve with a timeout if we get no response + timeout_future = gen.with_timeout(loop.time() + self.kernel_info_timeout, iopub_future) + timeout_future.add_done_callback(give_up) + return _ensure_future(timeout_future) + + def is_subscribed(self): + """Check zmq subscriptions depending on the protocol version. + Either with iopub welcome message if supported, or with `nudge` + """ + if self._is_iopub_welcome_supported(): + return self.wait_for_iopub_welcome() + else: + self.log.warning( + "Be aware that using an old kernel protocol may involve possible loss of early iopub messages" + ) + return self.nudge() + async def _register_session(self): """Ensure we aren't creating a duplicate session. @@ -342,7 +413,7 @@ def connect(self): # The kernel's ports have not changed; use the channels captured in the buffer self.channels = buffer_info["channels"] - connected = self.nudge() + connected = self.is_subscribed() def replay(value): replay_buffer = buffer_info["buffer"] @@ -356,7 +427,7 @@ def replay(value): else: try: self.create_stream() - connected = self.nudge() + connected = self.is_subscribed() except web.HTTPError as e: # Do not log error if the kernel is already shutdown, # as it's normal that it's not responding From 50c7a496462239eecc5361f997f3574fbce7d53b Mon Sep 17 00:00:00 2001 From: Hind Montassif Date: Mon, 24 Jul 2023 18:56:37 +0200 Subject: [PATCH 2/2] Add missing arg --- jupyter_server/services/kernels/connection/channels.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jupyter_server/services/kernels/connection/channels.py b/jupyter_server/services/kernels/connection/channels.py index 2c67ec7121..62874b94e3 100644 --- a/jupyter_server/services/kernels/connection/channels.py +++ b/jupyter_server/services/kernels/connection/channels.py @@ -314,7 +314,7 @@ def on_iopub(msg): iopub_channel.on_recv(on_iopub) loop = IOLoop.current() - def give_up(): + def give_up(value): """Don't wait forever for the kernel""" if iopub_future.done(): return