diff --git a/neovim/api/common.py b/neovim/api/common.py index 63d8a52f..112a4cb2 100644 --- a/neovim/api/common.py +++ b/neovim/api/common.py @@ -211,6 +211,10 @@ def threadsafe_call(self, fn, *args, **kwargs): """Wrapper for Session.threadsafe_call.""" self._session.threadsafe_call(fn, *args, **kwargs) + def poll_fd(self, fd, on_readable=None, on_writable=None): + """Wrapper around `Session.poll_fd`.""" + return self._session.poll_fd(fd, on_readable, on_writable) + def next_message(self): """Wrapper for Session.next_message.""" msg = self._session.next_message() diff --git a/neovim/msgpack_rpc/async_session.py b/neovim/msgpack_rpc/async_session.py index 826203ef..a6c72bf3 100644 --- a/neovim/msgpack_rpc/async_session.py +++ b/neovim/msgpack_rpc/async_session.py @@ -32,6 +32,10 @@ def threadsafe_call(self, fn): """Wrapper around `MsgpackStream.threadsafe_call`.""" self._msgpack_stream.threadsafe_call(fn) + def poll_fd(self, fd, on_readable, on_writable): + """Wrapper around `BaseEventLoop.poll_fd`.""" + return self._msgpack_stream.poll_fd(fd, on_readable, on_writable) + def request(self, method, args, response_cb): """Send a msgpack-rpc request to Nvim. diff --git a/neovim/msgpack_rpc/event_loop/asyncio.py b/neovim/msgpack_rpc/event_loop/asyncio.py index 49db6c57..b9610ed5 100644 --- a/neovim/msgpack_rpc/event_loop/asyncio.py +++ b/neovim/msgpack_rpc/event_loop/asyncio.py @@ -110,6 +110,18 @@ def _stop(self): def _threadsafe_call(self, fn): self._loop.call_soon_threadsafe(fn) + def _poll_fd(self, fd, on_readable, on_writable): + if on_readable is not None: + self._loop.add_reader(fd, on_readable) + if on_writable is not None: + self._loop.add_writer(fd, on_writable) + def cancel(): + if on_readable is not None: + self._loop.remove_reader(fd) + if on_writable is not None: + self._loop.remove_writer(fd) + return cancel + def _setup_signals(self, signals): self._signals = list(signals) for signum in self._signals: diff --git a/neovim/msgpack_rpc/event_loop/base.py b/neovim/msgpack_rpc/event_loop/base.py index a0dd0c01..91ce54a5 100644 --- a/neovim/msgpack_rpc/event_loop/base.py +++ b/neovim/msgpack_rpc/event_loop/base.py @@ -121,6 +121,18 @@ def threadsafe_call(self, fn): """ self._threadsafe_call(fn) + def poll_fd(self, fd, on_readable=None, on_writable=None): + """ + Invoke callbacks when the fd is ready for reading and/or writing. if `on_readable` is not None, it should be callback, which will be invokedi (with no arguments) when the fd is ready for writing. Similarily if `on_writable` is not None it will be invoked when the fd is ready for writing. + + Only one callback (of each kind) can be registered on the same fd at a time. If both readability and writability should be monitored, both callbacks must be registered by the same `poll_fd` call. + + Returns a function that deactivates the callback(s). + """ + if on_readable is None and on_writable is None: + raise ValueError("poll_fd: At least one of `on_readable` and `on_writable` must be present") + return self._poll_fd(fd, on_readable, on_writable) + def run(self, data_cb): """Run the event loop.""" if self._error: diff --git a/neovim/msgpack_rpc/event_loop/uv.py b/neovim/msgpack_rpc/event_loop/uv.py index 73daab42..1d6a7454 100644 --- a/neovim/msgpack_rpc/event_loop/uv.py +++ b/neovim/msgpack_rpc/event_loop/uv.py @@ -106,6 +106,22 @@ def _on_async(self, handle): while self._callbacks: self._callbacks.popleft()() + def _poll_fd(self, fd, on_readable, on_writable): + poll = pyuv.Poll(self._loop, fd) + events = 0 + if on_readable is not None: + events |= pyuv.UV_READABLE + if on_writable is not None: + events |= pyuv.UV_WRITABLE + def callback(poll_handle, evts, errorno): + if evts & pyuv.UV_READABLE: + on_readable() + if evts & pyuv.UV_WRITABLE: + on_writable() + + poll.start(events, callback) + return poll.stop + def _setup_signals(self, signals): self._signal_handles = [] diff --git a/neovim/msgpack_rpc/msgpack_stream.py b/neovim/msgpack_rpc/msgpack_stream.py index 6d51af6e..8735b0a3 100644 --- a/neovim/msgpack_rpc/msgpack_stream.py +++ b/neovim/msgpack_rpc/msgpack_stream.py @@ -31,6 +31,10 @@ def threadsafe_call(self, fn): """Wrapper around `BaseEventLoop.threadsafe_call`.""" self._event_loop.threadsafe_call(fn) + def poll_fd(self, fd, on_readable, on_writable): + """Wrapper around `BaseEventLoop.poll_fd`.""" + return self._event_loop.poll_fd(fd, on_readable, on_writable) + def send(self, msg): """Queue `msg` for sending to Nvim.""" debug('sent %s', msg) diff --git a/neovim/msgpack_rpc/session.py b/neovim/msgpack_rpc/session.py index 787482bc..6e3068e7 100644 --- a/neovim/msgpack_rpc/session.py +++ b/neovim/msgpack_rpc/session.py @@ -39,6 +39,10 @@ def greenlet_wrapper(): self._async_session.threadsafe_call(greenlet_wrapper) + def poll_fd(self, fd, on_readable, on_writable): + """Wrapper around `AsyncSession.threadsafe_call`.""" + self._async_session.poll_fd(fd, on_readable, on_writable) + def next_message(self): """Block until a message(request or notification) is available.