diff --git a/neovim/api/common.py b/neovim/api/common.py index cc094d60..c2aa5629 100644 --- a/neovim/api/common.py +++ b/neovim/api/common.py @@ -208,6 +208,10 @@ def threadsafe_call(self, fn, *args, **kwargs): """Wrapper for Session.threadsafe_call.""" self._session.threadsafe_call(fn, *args, **kwargs) + def poll_fd(self, fn, fd, readable=False, writable=False): + """Wrapper around `Session.poll_fd`.""" + return self._session.poll_fd(fn, fd, readable, writable) + def next_message(self): """Wrapper for Session.next_message.""" msg = self._session.next_message() diff --git a/neovim/msgpack_rpc/async_session.py b/neovim/msgpack_rpc/async_session.py index 826203ef..c1c527bb 100644 --- a/neovim/msgpack_rpc/async_session.py +++ b/neovim/msgpack_rpc/async_session.py @@ -32,6 +32,10 @@ def threadsafe_call(self, fn): """Wrapper around `MsgpackStream.threadsafe_call`.""" self._msgpack_stream.threadsafe_call(fn) + def poll_fd(self, fn, fd, readable, writable): + """Wrapper around `BaseEventLoop.poll_fd`.""" + return self._msgpack_stream.poll_fd(fn, fd, readable, writable) + def request(self, method, args, response_cb): """Send a msgpack-rpc request to Nvim. diff --git a/neovim/msgpack_rpc/event_loop/asyncio.py b/neovim/msgpack_rpc/event_loop/asyncio.py index 4d687673..cf0b63ec 100644 --- a/neovim/msgpack_rpc/event_loop/asyncio.py +++ b/neovim/msgpack_rpc/event_loop/asyncio.py @@ -110,6 +110,18 @@ def _stop(self): def _threadsafe_call(self, fn): self._loop.call_soon_threadsafe(fn) + def _poll_fd(self, fn, fd, readable, writable): + if readable: + self._loop.add_reader(fd, lambda: fn(fd, True, False)) + if writable: + self._loop.add_writer(fd, lambda: fn(fd, False, True)) + def cancel(): + if readable: + self._loop.remove_reader(fd) + if writable: + self._loop.remove_writer(fd) + return cancel + def _setup_signals(self, signals): self._signals = list(signals) for signum in self._signals: diff --git a/neovim/msgpack_rpc/event_loop/base.py b/neovim/msgpack_rpc/event_loop/base.py index 96fe032a..b863dd8d 100644 --- a/neovim/msgpack_rpc/event_loop/base.py +++ b/neovim/msgpack_rpc/event_loop/base.py @@ -119,6 +119,17 @@ def threadsafe_call(self, fn): """ self._threadsafe_call(fn) + def poll_fd(self, fn, fd, readable=False, writable=False): + """Call a function when the fd is ready for reading and/or writing. `readable` and `writeable` + are boolean flags determing which condition/s/ should be polled for. + + The calback signature is fn(fd, is_readable, is_writable) + Only one callback may be registered per fd. Returns function to deactivate the callback. + """ + if not readable and not writable: + raise ValueError("poll_fd: At least one of `readable` and `writable` must be True") + return self._poll_fd(fn, fd, readable, writable) + def run(self, data_cb): """Run the event loop.""" if self._error: diff --git a/neovim/msgpack_rpc/event_loop/uv.py b/neovim/msgpack_rpc/event_loop/uv.py index 73daab42..4603e39d 100644 --- a/neovim/msgpack_rpc/event_loop/uv.py +++ b/neovim/msgpack_rpc/event_loop/uv.py @@ -106,6 +106,21 @@ def _on_async(self, handle): while self._callbacks: self._callbacks.popleft()() + def _poll_fd(self, fn, fd, readable, writable): + poll = pyuv.Poll(self._loop, fd) + events = 0 + if readable: + events |= pyuv.UV_READABLE + if writable: + events |= pyuv.UV_WRITABLE + def callback(poll_handle, evts, errorno): + is_readable = evts & pyuv.UV_READABLE + is_writable = evts & pyuv.UV_WRITABLE + fn(fd, is_readable, is_writable) + + poll.start(events, fn) + return poll.stop + def _setup_signals(self, signals): self._signal_handles = [] diff --git a/neovim/msgpack_rpc/msgpack_stream.py b/neovim/msgpack_rpc/msgpack_stream.py index 6d51af6e..c5c83687 100644 --- a/neovim/msgpack_rpc/msgpack_stream.py +++ b/neovim/msgpack_rpc/msgpack_stream.py @@ -31,6 +31,10 @@ def threadsafe_call(self, fn): """Wrapper around `BaseEventLoop.threadsafe_call`.""" self._event_loop.threadsafe_call(fn) + def poll_fd(self, fn, fd, readable, writable): + """Wrapper around `BaseEventLoop.poll_fd`.""" + return self._event_loop.poll_fd(fn, fd, readable, writable) + def send(self, msg): """Queue `msg` for sending to Nvim.""" debug('sent %s', msg) diff --git a/neovim/msgpack_rpc/session.py b/neovim/msgpack_rpc/session.py index cc217a66..b3af2f7e 100644 --- a/neovim/msgpack_rpc/session.py +++ b/neovim/msgpack_rpc/session.py @@ -39,6 +39,14 @@ def greenlet_wrapper(): self._async_session.threadsafe_call(greenlet_wrapper) + def poll_fd(self, fn, fd, readable, writable): + """Wrapper around `AsyncSession.threadsafe_call`.""" + def greenlet_wrapper(fd, is_readable, is_writable): + gr = greenlet.greenlet(fn) + gr.switch(fd, is_readable, is_writable) + + self._async_session.poll_fd(greenlet_wrapper, fd, readable, writable) + def next_message(self): """Block until a message(request or notification) is available.