Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --node-ipc support #2015

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
93 changes: 70 additions & 23 deletions plugin/core/transports.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from queue import Queue
import http
import json
import multiprocessing.connection
import os
import shutil
import socket
Expand Down Expand Up @@ -48,26 +49,33 @@ def on_stderr_message(self, message: str) -> None:

class AbstractProcessor(Generic[T]):

def write_data(self, writer: IO[bytes], data: T) -> None:
def write_data(self, writer: IO[bytes], data: T, is_node_ipc: bool) -> None:
raise NotImplementedError()

def read_data(self, reader: IO[bytes]) -> Optional[T]:
def read_data(self, reader: IO[bytes], is_node_ipc: bool) -> Optional[T]:
raise NotImplementedError()


class JsonRpcProcessor(AbstractProcessor[Dict[str, Any]]):

def write_data(self, writer: IO[bytes], data: Dict[str, Any]) -> None:
def write_data(self, writer: IO[bytes], data: Dict[str, Any], is_node_ipc: bool) -> None:
body = self._encode(data)
writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body))
if not is_node_ipc:
writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body))
else:
writer.write(body + b"\n")

def read_data(self, reader: IO[bytes], is_node_ipc: bool) -> Optional[Dict[str, Any]]:
if not is_node_ipc:
headers = http.client.parse_headers(reader) # type: ignore
try:
body = reader.read(int(headers.get("Content-Length")))
except TypeError:
# Expected error on process stopping. Stop the read loop.
raise StopLoopError()
else:
body = reader.readline()
alecmev marked this conversation as resolved.
Show resolved Hide resolved

def read_data(self, reader: IO[bytes]) -> Optional[Dict[str, Any]]:
headers = http.client.parse_headers(reader) # type: ignore
try:
body = reader.read(int(headers.get("Content-Length")))
except TypeError:
# Expected error on process stopping. Stop the read loop.
raise StopLoopError()
try:
return self._decode(body)
except Exception as ex:
Expand All @@ -79,7 +87,6 @@ def _encode(data: Dict[str, Any]) -> bytes:
return json.dumps(
data,
ensure_ascii=False,
sort_keys=False,
check_circular=False,
rchl marked this conversation as resolved.
Show resolved Hide resolved
separators=(',', ':')
).encode('utf-8')
Expand All @@ -93,7 +100,7 @@ class ProcessTransport(Transport[T]):

def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket.socket], reader: IO[bytes],
writer: IO[bytes], stderr: Optional[IO[bytes]], processor: AbstractProcessor[T],
callback_object: TransportCallbacks[T]) -> None:
callback_object: TransportCallbacks[T], is_node_ipc: bool) -> None:
self._closed = False
self._process = process
self._socket = socket
Expand All @@ -105,6 +112,7 @@ def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket
self._writer_thread = threading.Thread(target=self._write_loop, name='{}-writer'.format(name))
self._stderr_thread = threading.Thread(target=self._stderr_loop, name='{}-stderr'.format(name))
self._callback_object = weakref.ref(callback_object)
self._is_node_ipc = is_node_ipc
self._send_queue = Queue(0) # type: Queue[Union[T, None]]
self._reader_thread.start()
self._writer_thread.start()
Expand Down Expand Up @@ -137,7 +145,7 @@ def __del__(self) -> None:
def _read_loop(self) -> None:
try:
while self._reader:
payload = self._processor.read_data(self._reader)
payload = self._processor.read_data(self._reader, self._is_node_ipc)
if payload is None:
continue

Expand Down Expand Up @@ -190,8 +198,9 @@ def _write_loop(self) -> None:
d = self._send_queue.get()
if d is None:
break
self._processor.write_data(self._writer, d)
self._writer.flush()
self._processor.write_data(self._writer, d, self._is_node_ipc)
if not self._is_node_ipc:
self._writer.flush()
except (BrokenPipeError, AttributeError):
pass
except Exception as ex:
Expand Down Expand Up @@ -223,24 +232,57 @@ def _stderr_loop(self) -> None:
json_rpc_processor = JsonRpcProcessor()


class NodeIpcIO():
_buf = bytearray()
_lines = 0

def __init__(self, conn: multiprocessing.connection._ConnectionBase):
self._conn = conn

# https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L378-L392
def readline(self) -> bytearray:
while self._lines == 0:
chunk = self._conn._read(self._conn.fileno(), 65536) # type: ignore
self._buf += chunk
self._lines += chunk.count(b'\n')

self._lines -= 1
line, _, self._buf = self._buf.partition(b'\n')
return line

# https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L369-L376
def write(self, data: bytes) -> None:
while len(data):
n = self._conn._write(self._conn.fileno(), data) # type: ignore
data = data[n:]


def create_transport(config: TransportConfig, cwd: Optional[str],
callback_object: TransportCallbacks) -> Transport[Dict[str, Any]]:
stderr = subprocess.PIPE
pass_fds = () # type: Union[Tuple[()], Tuple[int]]
if config.tcp_port is not None:
assert config.tcp_port is not None
if config.tcp_port < 0:
stdout = subprocess.PIPE
else:
stdout = subprocess.DEVNULL
stdin = subprocess.DEVNULL
else:
elif not config.node_ipc:
stdout = subprocess.PIPE
stdin = subprocess.PIPE
else:
stdout = subprocess.PIPE
stdin = subprocess.DEVNULL
stderr = subprocess.STDOUT
pass_fds = (config.node_ipc.child_conn.fileno(),)

startupinfo = _fixup_startup_args(config.command)
sock = None # type: Optional[socket.socket]
process = None # type: Optional[subprocess.Popen]

def start_subprocess() -> subprocess.Popen:
return _start_subprocess(config.command, stdin, stdout, subprocess.PIPE, startupinfo, config.env, cwd)
return _start_subprocess(config.command, stdin, stdout, stderr, startupinfo, config.env, cwd, pass_fds)

if config.listener_socket:
assert isinstance(config.tcp_port, int) and config.tcp_port > 0
Expand All @@ -258,13 +300,16 @@ def start_subprocess() -> subprocess.Popen:
raise RuntimeError("Failed to connect on port {}".format(config.tcp_port))
reader = sock.makefile('rwb') # type: ignore
writer = reader
else:
elif not config.node_ipc:
reader = process.stdout # type: ignore
writer = process.stdin # type: ignore
else:
reader = writer = NodeIpcIO(config.node_ipc.parent_conn) # type: ignore
if not reader or not writer:
raise RuntimeError('Failed initializing transport: reader: {}, writer: {}'.format(reader, writer))
return ProcessTransport(config.name, process, sock, reader, writer, process.stderr, json_rpc_processor,
callback_object)
stderr_reader = process.stdout if config.node_ipc else process.stderr
return ProcessTransport(config.name, process, sock, reader, writer, stderr_reader, json_rpc_processor,
callback_object, bool(config.node_ipc))


_subprocesses = weakref.WeakSet() # type: weakref.WeakSet[subprocess.Popen]
Expand Down Expand Up @@ -312,7 +357,8 @@ def _start_subprocess(
stderr: int,
startupinfo: Any,
env: Dict[str, str],
cwd: Optional[str]
cwd: Optional[str],
pass_fds: Union[Tuple[()], Tuple[int]]
) -> subprocess.Popen:
debug("starting {} in {}".format(args, cwd if cwd else os.getcwd()))
process = subprocess.Popen(
Expand All @@ -322,7 +368,8 @@ def _start_subprocess(
stderr=stderr,
startupinfo=startupinfo,
env=env,
cwd=cwd)
cwd=cwd,
pass_fds=pass_fds)
_subprocesses.add(process)
return process

Expand Down
22 changes: 19 additions & 3 deletions plugin/core/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
from wcmatch.glob import BRACE
from wcmatch.glob import globmatch
from wcmatch.glob import GLOBSTAR
import collections
alecmev marked this conversation as resolved.
Show resolved Hide resolved
import contextlib
import fnmatch
import multiprocessing
import os
import posixpath
import socket
Expand Down Expand Up @@ -605,24 +607,34 @@ def map_from_remote_to_local(self, uri: str) -> Tuple[str, bool]:
return _translate_path(uri, self._remote, self._local)


NodeIpc = collections.namedtuple('NodeIpc', 'parent_conn,child_conn')
alecmev marked this conversation as resolved.
Show resolved Hide resolved


class TransportConfig:
__slots__ = ("name", "command", "tcp_port", "env", "listener_socket")
__slots__ = ("name", "command", "tcp_port", "env", "listener_socket", "node_ipc")

def __init__(
self,
name: str,
command: List[str],
tcp_port: Optional[int],
env: Dict[str, str],
listener_socket: Optional[socket.socket]
listener_socket: Optional[socket.socket],
node_ipc: Optional[NodeIpc]
) -> None:
if not command and not tcp_port:
raise ValueError('neither "command" nor "tcp_port" is provided; cannot start a language server')
if node_ipc and (tcp_port or listener_socket):
raise ValueError(
'"tcp_port" and "listener_socket" can\'t be provided in "--node-ipc" mode; ' +
'cannot start a language server'
)
self.name = name
self.command = command
self.tcp_port = tcp_port
self.env = env
self.listener_socket = listener_socket
self.node_ipc = node_ipc


class ClientConfig:
Expand Down Expand Up @@ -790,7 +802,11 @@ def resolve_transport_config(self, variables: Dict[str, str]) -> TransportConfig
env[key] = sublime.expand_variables(value, variables) + os.path.pathsep + env[key]
else:
env[key] = sublime.expand_variables(value, variables)
return TransportConfig(self.name, command, tcp_port, env, listener_socket)
node_ipc = None
if '--node-ipc' in command:
node_ipc = NodeIpc(*multiprocessing.Pipe())
env["NODE_CHANNEL_FD"] = str(node_ipc.child_conn.fileno())
return TransportConfig(self.name, command, tcp_port, env, listener_socket, node_ipc)

def set_view_status(self, view: sublime.View, message: str) -> None:
if sublime.load_settings("LSP.sublime-settings").get("show_view_status"):
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Tox (http://tox.testrun.org/) is a tool for running tests
# Tox (https://github.com/tox-dev/tox) is a tool for running tests
# in multiple virtualenvs. This configuration file will run the
# test suite on all supported python versions. To use it, "pip install tox"
# and then run "tox" from this directory.
Expand Down