From f14e088a72088064dfa217381fa6e83603fed5c0 Mon Sep 17 00:00:00 2001 From: Alec Mev Date: Tue, 9 Aug 2022 16:51:51 +0300 Subject: [PATCH 01/10] Add --node-ipc support Fix https://github.com/sublimelsp/LSP/issues/1612 --- plugin/core/transports.py | 95 +++++++++++++++++++++++++++++---------- plugin/core/types.py | 23 ++++++++-- tox.ini | 2 +- 3 files changed, 93 insertions(+), 27 deletions(-) diff --git a/plugin/core/transports.py b/plugin/core/transports.py index 21595bc5b..0f0faa016 100644 --- a/plugin/core/transports.py +++ b/plugin/core/transports.py @@ -7,6 +7,7 @@ from queue import Queue import http import json +import multiprocessing.connection import os import shutil import socket @@ -48,26 +49,33 @@ def on_stderr_message(self, message: str) -> None: class AbstractProcessor(Generic[T]): - def write_data(self, writer: IO[bytes], data: T) -> None: + def write_data(self, writer: IO[bytes], data: T, is_node_ipc: bool) -> None: raise NotImplementedError() - def read_data(self, reader: IO[bytes]) -> Optional[T]: + def read_data(self, reader: IO[bytes], is_node_ipc: bool) -> Optional[T]: raise NotImplementedError() class JsonRpcProcessor(AbstractProcessor[Dict[str, Any]]): - def write_data(self, writer: IO[bytes], data: Dict[str, Any]) -> None: + def write_data(self, writer: IO[bytes], data: Dict[str, Any], is_node_ipc: bool) -> None: body = self._encode(data) - writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body)) + if not is_node_ipc: + writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body)) + else: + writer.write(body + b"\n") + + def read_data(self, reader: IO[bytes], is_node_ipc: bool) -> Optional[Dict[str, Any]]: + if not is_node_ipc: + headers = http.client.parse_headers(reader) # type: ignore + try: + body = reader.read(int(headers.get("Content-Length"))) + except TypeError: + # Expected error on process stopping. Stop the read loop. + raise StopLoopError() + else: + body = reader.readline() - def read_data(self, reader: IO[bytes]) -> Optional[Dict[str, Any]]: - headers = http.client.parse_headers(reader) # type: ignore - try: - body = reader.read(int(headers.get("Content-Length"))) - except TypeError: - # Expected error on process stopping. Stop the read loop. - raise StopLoopError() try: return self._decode(body) except Exception as ex: @@ -79,7 +87,6 @@ def _encode(data: Dict[str, Any]) -> bytes: return json.dumps( data, ensure_ascii=False, - sort_keys=False, check_circular=False, separators=(',', ':') ).encode('utf-8') @@ -93,7 +100,7 @@ class ProcessTransport(Transport[T]): def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket.socket], reader: IO[bytes], writer: IO[bytes], stderr: Optional[IO[bytes]], processor: AbstractProcessor[T], - callback_object: TransportCallbacks[T]) -> None: + callback_object: TransportCallbacks[T], is_node_ipc: bool) -> None: self._closed = False self._process = process self._socket = socket @@ -105,6 +112,7 @@ def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket self._writer_thread = threading.Thread(target=self._write_loop, name='{}-writer'.format(name)) self._stderr_thread = threading.Thread(target=self._stderr_loop, name='{}-stderr'.format(name)) self._callback_object = weakref.ref(callback_object) + self._is_node_ipc = is_node_ipc self._send_queue = Queue(0) # type: Queue[Union[T, None]] self._reader_thread.start() self._writer_thread.start() @@ -137,7 +145,7 @@ def __del__(self) -> None: def _read_loop(self) -> None: try: while self._reader: - payload = self._processor.read_data(self._reader) + payload = self._processor.read_data(self._reader, self._is_node_ipc) if payload is None: continue @@ -190,8 +198,9 @@ def _write_loop(self) -> None: d = self._send_queue.get() if d is None: break - self._processor.write_data(self._writer, d) - self._writer.flush() + self._processor.write_data(self._writer, d, self._is_node_ipc) + if not self._is_node_ipc: + self._writer.flush() except (BrokenPipeError, AttributeError): pass except Exception as ex: @@ -223,8 +232,37 @@ def _stderr_loop(self) -> None: json_rpc_processor = JsonRpcProcessor() +class NodeIpcIO(): + _buf = bytearray() + _lines = 0 + + def __init__(self, conn: multiprocessing.connection._ConnectionBase): + self._fd = conn.fileno() + self._read = conn._read # type: ignore + self._write = conn._write # type: ignore + + # https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L378-L392 + def readline(self) -> bytearray: + while self._lines == 0: + chunk = self._read(self._fd, 65536) # type: bytes + self._buf += chunk + self._lines += chunk.count(b'\n') + + self._lines -= 1 + line, _, self._buf = self._buf.partition(b'\n') + return line + + # https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L369-L376 + def write(self, data: bytes) -> None: + while len(data): + n = self._write(self._fd, data) # type: int + data = data[n:] + + def create_transport(config: TransportConfig, cwd: Optional[str], callback_object: TransportCallbacks) -> Transport[Dict[str, Any]]: + stderr = subprocess.PIPE + pass_fds = () # type: Union[Tuple[()], Tuple[int]] if config.tcp_port is not None: assert config.tcp_port is not None if config.tcp_port < 0: @@ -232,15 +270,21 @@ def create_transport(config: TransportConfig, cwd: Optional[str], else: stdout = subprocess.DEVNULL stdin = subprocess.DEVNULL - else: + elif not config.node_ipc: stdout = subprocess.PIPE stdin = subprocess.PIPE + else: + stdout = subprocess.PIPE + stdin = subprocess.DEVNULL + stderr = subprocess.STDOUT + pass_fds = (config.node_ipc.child_conn.fileno(),) + startupinfo = _fixup_startup_args(config.command) sock = None # type: Optional[socket.socket] process = None # type: Optional[subprocess.Popen] def start_subprocess() -> subprocess.Popen: - return _start_subprocess(config.command, stdin, stdout, subprocess.PIPE, startupinfo, config.env, cwd) + return _start_subprocess(config.command, stdin, stdout, stderr, startupinfo, config.env, cwd, pass_fds) if config.listener_socket: assert isinstance(config.tcp_port, int) and config.tcp_port > 0 @@ -258,13 +302,16 @@ def start_subprocess() -> subprocess.Popen: raise RuntimeError("Failed to connect on port {}".format(config.tcp_port)) reader = sock.makefile('rwb') # type: ignore writer = reader - else: + elif not config.node_ipc: reader = process.stdout # type: ignore writer = process.stdin # type: ignore + else: + reader = writer = NodeIpcIO(config.node_ipc.parent_conn) # type: ignore if not reader or not writer: raise RuntimeError('Failed initializing transport: reader: {}, writer: {}'.format(reader, writer)) - return ProcessTransport(config.name, process, sock, reader, writer, process.stderr, json_rpc_processor, - callback_object) + stderr_reader = process.stdout if config.node_ipc else process.stderr + return ProcessTransport(config.name, process, sock, reader, writer, stderr_reader, json_rpc_processor, + callback_object, bool(config.node_ipc)) _subprocesses = weakref.WeakSet() # type: weakref.WeakSet[subprocess.Popen] @@ -312,7 +359,8 @@ def _start_subprocess( stderr: int, startupinfo: Any, env: Dict[str, str], - cwd: Optional[str] + cwd: Optional[str], + pass_fds: Union[Tuple[()], Tuple[int]] ) -> subprocess.Popen: debug("starting {} in {}".format(args, cwd if cwd else os.getcwd())) process = subprocess.Popen( @@ -322,7 +370,8 @@ def _start_subprocess( stderr=stderr, startupinfo=startupinfo, env=env, - cwd=cwd) + cwd=cwd, + pass_fds=pass_fds) _subprocesses.add(process) return process diff --git a/plugin/core/types.py b/plugin/core/types.py index 85d352167..3c2db67da 100644 --- a/plugin/core/types.py +++ b/plugin/core/types.py @@ -10,8 +10,11 @@ from wcmatch.glob import BRACE from wcmatch.glob import globmatch from wcmatch.glob import GLOBSTAR +import collections import contextlib import fnmatch +import multiprocessing +import multiprocessing.connection import os import posixpath import socket @@ -605,8 +608,11 @@ def map_from_remote_to_local(self, uri: str) -> Tuple[str, bool]: return _translate_path(uri, self._remote, self._local) +NodeIpc = collections.namedtuple('NodeIpc', 'parent_conn,child_conn') + + class TransportConfig: - __slots__ = ("name", "command", "tcp_port", "env", "listener_socket") + __slots__ = ("name", "command", "tcp_port", "env", "listener_socket", "node_ipc") def __init__( self, @@ -614,15 +620,22 @@ def __init__( command: List[str], tcp_port: Optional[int], env: Dict[str, str], - listener_socket: Optional[socket.socket] + listener_socket: Optional[socket.socket], + node_ipc: Optional[NodeIpc] ) -> None: if not command and not tcp_port: raise ValueError('neither "command" nor "tcp_port" is provided; cannot start a language server') + if node_ipc and (tcp_port or listener_socket): + raise ValueError( + '"tcp_port" and "listener_socket" can\'t be provided in "--node-ipc" mode; ' + + 'cannot start a language server' + ) self.name = name self.command = command self.tcp_port = tcp_port self.env = env self.listener_socket = listener_socket + self.node_ipc = node_ipc class ClientConfig: @@ -790,7 +803,11 @@ def resolve_transport_config(self, variables: Dict[str, str]) -> TransportConfig env[key] = sublime.expand_variables(value, variables) + os.path.pathsep + env[key] else: env[key] = sublime.expand_variables(value, variables) - return TransportConfig(self.name, command, tcp_port, env, listener_socket) + node_ipc = None + if '--node-ipc' in command: + node_ipc = NodeIpc(*multiprocessing.Pipe()) + env["NODE_CHANNEL_FD"] = str(node_ipc.child_conn.fileno()) + return TransportConfig(self.name, command, tcp_port, env, listener_socket, node_ipc) def set_view_status(self, view: sublime.View, message: str) -> None: if sublime.load_settings("LSP.sublime-settings").get("show_view_status"): diff --git a/tox.ini b/tox.ini index 0c7991ad2..9df60b5fa 100644 --- a/tox.ini +++ b/tox.ini @@ -1,4 +1,4 @@ -# Tox (http://tox.testrun.org/) is a tool for running tests +# Tox (https://github.com/tox-dev/tox) is a tool for running tests # in multiple virtualenvs. This configuration file will run the # test suite on all supported python versions. To use it, "pip install tox" # and then run "tox" from this directory. From 6ab3fca0d35d0ec8c7f3bad250ade2f376a50cf7 Mon Sep 17 00:00:00 2001 From: Alec Mev Date: Tue, 9 Aug 2022 18:12:55 +0300 Subject: [PATCH 02/10] Fix "Bad file descriptor" Somehow aliasing _read to conn._read breaks it, even though it's not a class method. --- plugin/core/transports.py | 8 +++----- plugin/core/types.py | 1 - 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/plugin/core/transports.py b/plugin/core/transports.py index 0f0faa016..43dadd6fa 100644 --- a/plugin/core/transports.py +++ b/plugin/core/transports.py @@ -237,14 +237,12 @@ class NodeIpcIO(): _lines = 0 def __init__(self, conn: multiprocessing.connection._ConnectionBase): - self._fd = conn.fileno() - self._read = conn._read # type: ignore - self._write = conn._write # type: ignore + self._conn = conn # https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L378-L392 def readline(self) -> bytearray: while self._lines == 0: - chunk = self._read(self._fd, 65536) # type: bytes + chunk = self._conn._read(self._conn.fileno(), 65536) # type: ignore self._buf += chunk self._lines += chunk.count(b'\n') @@ -255,7 +253,7 @@ def readline(self) -> bytearray: # https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L369-L376 def write(self, data: bytes) -> None: while len(data): - n = self._write(self._fd, data) # type: int + n = self._conn._write(self._conn.fileno(), data) # type: ignore data = data[n:] diff --git a/plugin/core/types.py b/plugin/core/types.py index 3c2db67da..e609243a6 100644 --- a/plugin/core/types.py +++ b/plugin/core/types.py @@ -14,7 +14,6 @@ import contextlib import fnmatch import multiprocessing -import multiprocessing.connection import os import posixpath import socket From ca343727ea46dd6274875e2699769bc11d90537e Mon Sep 17 00:00:00 2001 From: Alec Mev Date: Wed, 10 Aug 2022 03:50:58 +0300 Subject: [PATCH 03/10] Refactor transport processors --- plugin/core/transports.py | 176 +++++++++++++++++++------------------- plugin/core/types.py | 20 ++--- sublime-package.json | 11 +++ tests/test_protocol.py | 6 +- 4 files changed, 111 insertions(+), 102 deletions(-) diff --git a/plugin/core/transports.py b/plugin/core/transports.py index 43dadd6fa..19a9cea72 100644 --- a/plugin/core/transports.py +++ b/plugin/core/transports.py @@ -5,7 +5,7 @@ from contextlib import closing from functools import partial from queue import Queue -import http +import http.client import json import multiprocessing.connection import os @@ -49,70 +49,97 @@ def on_stderr_message(self, message: str) -> None: class AbstractProcessor(Generic[T]): - def write_data(self, writer: IO[bytes], data: T, is_node_ipc: bool) -> None: + def write_data(self, data: T) -> None: raise NotImplementedError() - def read_data(self, reader: IO[bytes], is_node_ipc: bool) -> Optional[T]: + def read_data(self) -> Optional[T]: raise NotImplementedError() -class JsonRpcProcessor(AbstractProcessor[Dict[str, Any]]): +def encode_payload(data: Dict[str, Any]) -> bytes: + return json.dumps( + data, + ensure_ascii=False, + check_circular=False, + separators=(',', ':') + ).encode('utf-8') - def write_data(self, writer: IO[bytes], data: Dict[str, Any], is_node_ipc: bool) -> None: - body = self._encode(data) - if not is_node_ipc: - writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body)) - else: - writer.write(body + b"\n") - def read_data(self, reader: IO[bytes], is_node_ipc: bool) -> Optional[Dict[str, Any]]: - if not is_node_ipc: - headers = http.client.parse_headers(reader) # type: ignore - try: - body = reader.read(int(headers.get("Content-Length"))) - except TypeError: - # Expected error on process stopping. Stop the read loop. - raise StopLoopError() - else: - body = reader.readline() +def decode_payload(message: bytes) -> Optional[Dict[str, Any]]: + try: + return json.loads(message.decode('utf-8')) + except Exception as ex: + exception_log("JSON decode error", ex) + return None + + +class StandardProcessor(AbstractProcessor[Dict[str, Any]]): + def __init__(self, reader: IO[bytes], writer: IO[bytes]): + self._reader = reader + self._writer = writer + + def write_data(self, data: Dict[str, Any]) -> None: + body = encode_payload(data) + self._writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body)) + self._writer.flush() + + def read_data(self) -> Optional[Dict[str, Any]]: + headers = http.client.parse_headers(self._reader) # type: ignore try: - return self._decode(body) - except Exception as ex: - exception_log("JSON decode error", ex) - return None - - @staticmethod - def _encode(data: Dict[str, Any]) -> bytes: - return json.dumps( - data, - ensure_ascii=False, - check_circular=False, - separators=(',', ':') - ).encode('utf-8') - - @staticmethod - def _decode(message: bytes) -> Dict[str, Any]: - return json.loads(message.decode('utf-8')) + body = self._reader.read(int(headers.get("Content-Length"))) + except TypeError: + # Expected error on process stopping. Stop the read loop. + raise StopLoopError() + return decode_payload(body) + + +class NodeIpcProcessor(AbstractProcessor[Dict[str, Any]]): + _buf = bytearray() + _lines = 0 + + def __init__(self, conn: multiprocessing.connection._ConnectionBase): + self._conn = conn + + def write_data(self, data: Dict[str, Any]) -> None: + body = encode_payload(data) + b"\n" + while len(body): + n = self._conn._write(self._conn.fileno(), body) # type: ignore + body = body[n:] + + def read_data(self) -> Optional[Dict[str, Any]]: + while self._lines == 0: + chunk = self._conn._read(self._conn.fileno(), 65536) # type: ignore + if len(chunk) == 0: + # EOF reached: https://docs.python.org/3/library/os.html#os.read + raise StopLoopError() + + self._buf += chunk + self._lines += chunk.count(b'\n') + + self._lines -= 1 + message, _, self._buf = self._buf.partition(b'\n') + return decode_payload(message) class ProcessTransport(Transport[T]): - def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket.socket], reader: IO[bytes], - writer: IO[bytes], stderr: Optional[IO[bytes]], processor: AbstractProcessor[T], - callback_object: TransportCallbacks[T], is_node_ipc: bool) -> None: + def __init__(self, + name: str, + process: subprocess.Popen, + socket: Optional[socket.socket], + stderr: Optional[IO[bytes]], + processor: AbstractProcessor[T], + callback_object: TransportCallbacks[T]) -> None: self._closed = False self._process = process self._socket = socket - self._reader = reader - self._writer = writer self._stderr = stderr self._processor = processor self._reader_thread = threading.Thread(target=self._read_loop, name='{}-reader'.format(name)) self._writer_thread = threading.Thread(target=self._write_loop, name='{}-writer'.format(name)) self._stderr_thread = threading.Thread(target=self._stderr_loop, name='{}-stderr'.format(name)) self._callback_object = weakref.ref(callback_object) - self._is_node_ipc = is_node_ipc self._send_queue = Queue(0) # type: Queue[Union[T, None]] self._reader_thread.start() self._writer_thread.start() @@ -144,8 +171,8 @@ def __del__(self) -> None: def _read_loop(self) -> None: try: - while self._reader: - payload = self._processor.read_data(self._reader, self._is_node_ipc) + while True: + payload = self._processor.read_data() if payload is None: continue @@ -194,13 +221,11 @@ def invoke() -> None: def _write_loop(self) -> None: exception = None # type: Optional[Exception] try: - while self._writer: + while True: d = self._send_queue.get() if d is None: break - self._processor.write_data(self._writer, d, self._is_node_ipc) - if not self._is_node_ipc: - self._writer.flush() + self._processor.write_data(d) except (BrokenPipeError, AttributeError): pass except Exception as ex: @@ -228,35 +253,6 @@ def _stderr_loop(self) -> None: self._send_queue.put_nowait(None) -# Can be a singleton since it doesn't hold any state. -json_rpc_processor = JsonRpcProcessor() - - -class NodeIpcIO(): - _buf = bytearray() - _lines = 0 - - def __init__(self, conn: multiprocessing.connection._ConnectionBase): - self._conn = conn - - # https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L378-L392 - def readline(self) -> bytearray: - while self._lines == 0: - chunk = self._conn._read(self._conn.fileno(), 65536) # type: ignore - self._buf += chunk - self._lines += chunk.count(b'\n') - - self._lines -= 1 - line, _, self._buf = self._buf.partition(b'\n') - return line - - # https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L369-L376 - def write(self, data: bytes) -> None: - while len(data): - n = self._conn._write(self._conn.fileno(), data) # type: ignore - data = data[n:] - - def create_transport(config: TransportConfig, cwd: Optional[str], callback_object: TransportCallbacks) -> Transport[Dict[str, Any]]: stderr = subprocess.PIPE @@ -292,24 +288,27 @@ def start_subprocess() -> subprocess.Popen: config.listener_socket, start_subprocess ) + processor = StandardProcessor(reader, writer) # type: AbstractProcessor else: process = start_subprocess() if config.tcp_port: sock = _connect_tcp(config.tcp_port) if sock is None: raise RuntimeError("Failed to connect on port {}".format(config.tcp_port)) - reader = sock.makefile('rwb') # type: ignore - writer = reader + reader = writer = sock.makefile('rwb') + processor = StandardProcessor(reader, writer) elif not config.node_ipc: - reader = process.stdout # type: ignore - writer = process.stdin # type: ignore + if not process.stdout or not process.stdin: + raise RuntimeError( + 'Failed initializing transport: reader: {}, writer: {}' + .format(process.stdout, process.stdin) + ) + processor = StandardProcessor(process.stdout, process.stdin) else: - reader = writer = NodeIpcIO(config.node_ipc.parent_conn) # type: ignore - if not reader or not writer: - raise RuntimeError('Failed initializing transport: reader: {}, writer: {}'.format(reader, writer)) + processor = NodeIpcProcessor(config.node_ipc.parent_conn) + stderr_reader = process.stdout if config.node_ipc else process.stderr - return ProcessTransport(config.name, process, sock, reader, writer, stderr_reader, json_rpc_processor, - callback_object, bool(config.node_ipc)) + return ProcessTransport(config.name, process, sock, stderr_reader, processor, callback_object) _subprocesses = weakref.WeakSet() # type: weakref.WeakSet[subprocess.Popen] @@ -403,8 +402,7 @@ def start_in_background(d: _SubprocessData) -> None: # Await one client connection (blocking!) sock, _ = listener_socket.accept() thread.join() - reader = sock.makefile('rwb') # type: IO[bytes] - writer = reader + reader = writer = sock.makefile('rwb') assert data.process return data.process, sock, reader, writer diff --git a/plugin/core/types.py b/plugin/core/types.py index e609243a6..9ebb810bd 100644 --- a/plugin/core/types.py +++ b/plugin/core/types.py @@ -607,7 +607,7 @@ def map_from_remote_to_local(self, uri: str) -> Tuple[str, bool]: return _translate_path(uri, self._remote, self._local) -NodeIpc = collections.namedtuple('NodeIpc', 'parent_conn,child_conn') +NodeIpcPipe = collections.namedtuple('NodeIpcPipe', 'parent_conn,child_conn') class TransportConfig: @@ -620,15 +620,10 @@ def __init__( tcp_port: Optional[int], env: Dict[str, str], listener_socket: Optional[socket.socket], - node_ipc: Optional[NodeIpc] + node_ipc: Optional[NodeIpcPipe] ) -> None: if not command and not tcp_port: raise ValueError('neither "command" nor "tcp_port" is provided; cannot start a language server') - if node_ipc and (tcp_port or listener_socket): - raise ValueError( - '"tcp_port" and "listener_socket" can\'t be provided in "--node-ipc" mode; ' + - 'cannot start a language server' - ) self.name = name self.command = command self.tcp_port = tcp_port @@ -644,6 +639,7 @@ def __init__(self, priority_selector: Optional[str] = None, schemes: Optional[List[str]] = None, command: Optional[List[str]] = None, + use_node_ipc: bool = False, binary_args: Optional[List[str]] = None, # DEPRECATED tcp_port: Optional[int] = None, auto_complete_selector: Optional[str] = None, @@ -668,6 +664,7 @@ def __init__(self, else: assert isinstance(binary_args, list) self.command = binary_args + self.use_node_ipc = use_node_ipc self.tcp_port = tcp_port self.auto_complete_selector = auto_complete_selector self.enabled = enabled @@ -701,9 +698,10 @@ def from_sublime_settings(cls, name: str, s: sublime.Settings, file: str) -> "Cl priority_selector=_read_priority_selector(s), schemes=s.get("schemes"), command=read_list_setting(s, "command", []), + use_node_ipc=bool(s.get("use_node_ipc", False)), tcp_port=s.get("tcp_port"), auto_complete_selector=s.get("auto_complete_selector"), - # Default to True, because an LSP plugin is enabled iff it is enabled as a Sublime package. + # Default to True, because an LSP plugin is enabled if it is enabled as a Sublime package. enabled=bool(s.get("enabled", True)), init_options=init_options, settings=settings, @@ -731,6 +729,7 @@ def from_dict(cls, name: str, d: Dict[str, Any]) -> "ClientConfig": priority_selector=_read_priority_selector(d), schemes=schemes, command=d.get("command", []), + use_node_ipc=d.get("use_node_ipc", False), tcp_port=d.get("tcp_port"), auto_complete_selector=d.get("auto_complete_selector"), enabled=d.get("enabled", False), @@ -758,6 +757,7 @@ def from_config(cls, src_config: "ClientConfig", override: Dict[str, Any]) -> "C priority_selector=_read_priority_selector(override) or src_config.priority_selector, schemes=override.get("schemes", src_config.schemes), command=override.get("command", src_config.command), + use_node_ipc=override.get("use_node_ipc", src_config.use_node_ipc), tcp_port=override.get("tcp_port", src_config.tcp_port), auto_complete_selector=override.get("auto_complete_selector", src_config.auto_complete_selector), enabled=override.get("enabled", src_config.enabled), @@ -803,8 +803,8 @@ def resolve_transport_config(self, variables: Dict[str, str]) -> TransportConfig else: env[key] = sublime.expand_variables(value, variables) node_ipc = None - if '--node-ipc' in command: - node_ipc = NodeIpc(*multiprocessing.Pipe()) + if self.use_node_ipc: + node_ipc = NodeIpcPipe(*multiprocessing.Pipe()) env["NODE_CHANNEL_FD"] = str(node_ipc.child_conn.fileno()) return TransportConfig(self.name, command, tcp_port, env, listener_socket, node_ipc) diff --git a/sublime-package.json b/sublime-package.json index 73927b458..ed3b75773 100644 --- a/sublime-package.json +++ b/sublime-package.json @@ -75,6 +75,11 @@ }, "markdownDescription": "The command to start the language server." }, + "ClientUseNodeIpc": { + "type": "boolean", + "default": false, + "markdownDescription": "Communicate with the language server over Node.js IPC. This lets the server print to stdout without disrupting the LSP communication. It's non-standard, but is used by VSCode. The command must be adjusted accordingly, e.g. `--stdio` must be replaced with `--node-ipc` in case of vscode-eslint. `tcp_port` is ignored if this is enabled." + }, "ClientEnabled": { "type": "boolean", "default": false, @@ -156,6 +161,9 @@ "command": { "$ref": "sublime://settings/LSP#/definitions/ClientCommand" }, + "use_node_ipc": { + "$ref": "sublime://settings/LSP#/definitions/ClientUseNodeIpc" + }, "enabled": { "$ref": "sublime://settings/LSP#/definitions/ClientEnabled" }, @@ -555,6 +563,9 @@ "command": { "$ref": "sublime://settings/LSP#/definitions/ClientCommand" }, + "use_node_ipc": { + "$ref": "sublime://settings/LSP#/definitions/ClientUseNodeIpc" + }, "enabled": { "$ref": "sublime://settings/LSP#/definitions/ClientEnabled" }, diff --git a/tests/test_protocol.py b/tests/test_protocol.py index 8e14f1d6e..198e7563e 100644 --- a/tests/test_protocol.py +++ b/tests/test_protocol.py @@ -1,5 +1,5 @@ from LSP.plugin.core.protocol import Point, Position, Range, RangeLsp, Request, Notification -from LSP.plugin.core.transports import JsonRpcProcessor +from LSP.plugin.core.transports import encode_payload, decode_payload import unittest @@ -129,9 +129,9 @@ def test_extend(self) -> None: class EncodingTests(unittest.TestCase): def test_encode(self) -> None: - encoded = JsonRpcProcessor._encode({"text": "😃"}) + encoded = encode_payload({"text": "😃"}) self.assertEqual(encoded, b'{"text":"\xF0\x9F\x98\x83"}') - decoded = JsonRpcProcessor._decode(encoded) + decoded = decode_payload(encoded) self.assertEqual(decoded, {"text": "😃"}) From bdcd755fc72553b1edaf31061286363171444c05 Mon Sep 17 00:00:00 2001 From: Predrag Nikolic Date: Wed, 10 Aug 2022 12:03:13 +0200 Subject: [PATCH 04/10] do not modify interfaces --- plugin/core/transports.py | 84 +++++++++++++++++++-------------------- plugin/core/types.py | 10 +++-- 2 files changed, 48 insertions(+), 46 deletions(-) diff --git a/plugin/core/transports.py b/plugin/core/transports.py index 19a9cea72..52bc58cf1 100644 --- a/plugin/core/transports.py +++ b/plugin/core/transports.py @@ -49,10 +49,10 @@ def on_stderr_message(self, message: str) -> None: class AbstractProcessor(Generic[T]): - def write_data(self, data: T) -> None: + def write_data(self, writer: IO[bytes], data: T) -> None: raise NotImplementedError() - def read_data(self) -> Optional[T]: + def read_data(self, reader: IO[bytes]) -> Optional[T]: raise NotImplementedError() @@ -74,20 +74,15 @@ def decode_payload(message: bytes) -> Optional[Dict[str, Any]]: class StandardProcessor(AbstractProcessor[Dict[str, Any]]): - - def __init__(self, reader: IO[bytes], writer: IO[bytes]): - self._reader = reader - self._writer = writer - - def write_data(self, data: Dict[str, Any]) -> None: + def write_data(self, writer: IO[bytes], data: Dict[str, Any]) -> None: body = encode_payload(data) - self._writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body)) - self._writer.flush() + writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body)) + writer.flush() - def read_data(self) -> Optional[Dict[str, Any]]: - headers = http.client.parse_headers(self._reader) # type: ignore + def read_data(self, reader: IO[bytes]) -> Optional[Dict[str, Any]]: + headers = http.client.parse_headers(reader) # type: ignore try: - body = self._reader.read(int(headers.get("Content-Length"))) + body = reader.read(int(headers.get("Content-Length"))) except TypeError: # Expected error on process stopping. Stop the read loop. raise StopLoopError() @@ -98,18 +93,15 @@ class NodeIpcProcessor(AbstractProcessor[Dict[str, Any]]): _buf = bytearray() _lines = 0 - def __init__(self, conn: multiprocessing.connection._ConnectionBase): - self._conn = conn - - def write_data(self, data: Dict[str, Any]) -> None: + def write_data(self, connection: multiprocessing.connection._ConnectionBase, data: Dict[str, Any]) -> None: body = encode_payload(data) + b"\n" while len(body): - n = self._conn._write(self._conn.fileno(), body) # type: ignore + n = connection._write(connection.fileno(), body) # type: ignore body = body[n:] - def read_data(self) -> Optional[Dict[str, Any]]: + def read_data(self, connection: multiprocessing.connection._ConnectionBase) -> Optional[Dict[str, Any]]: while self._lines == 0: - chunk = self._conn._read(self._conn.fileno(), 65536) # type: ignore + chunk = connection._read(connection.fileno(), 65536) # type: ignore if len(chunk) == 0: # EOF reached: https://docs.python.org/3/library/os.html#os.read raise StopLoopError() @@ -124,16 +116,14 @@ def read_data(self) -> Optional[Dict[str, Any]]: class ProcessTransport(Transport[T]): - def __init__(self, - name: str, - process: subprocess.Popen, - socket: Optional[socket.socket], - stderr: Optional[IO[bytes]], - processor: AbstractProcessor[T], + def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket.socket], reader: Any, + writer: Any, stderr: Optional[IO[bytes]], processor: AbstractProcessor[T], callback_object: TransportCallbacks[T]) -> None: self._closed = False self._process = process self._socket = socket + self._reader = reader + self._writer = writer self._stderr = stderr self._processor = processor self._reader_thread = threading.Thread(target=self._read_loop, name='{}-reader'.format(name)) @@ -171,8 +161,8 @@ def __del__(self) -> None: def _read_loop(self) -> None: try: - while True: - payload = self._processor.read_data() + while self._reader: + payload = self._processor.read_data(self._reader) if payload is None: continue @@ -221,11 +211,11 @@ def invoke() -> None: def _write_loop(self) -> None: exception = None # type: Optional[Exception] try: - while True: + while self._writer: d = self._send_queue.get() if d is None: break - self._processor.write_data(d) + self._processor.write_data(self._writer, d) except (BrokenPipeError, AttributeError): pass except Exception as ex: @@ -252,6 +242,10 @@ def _stderr_loop(self) -> None: exception_log('unexpected exception type in stderr loop', ex) self._send_queue.put_nowait(None) +# Can be a singleton since it doesn't hold any state. +standard_processor = StandardProcessor() +node_ipc_processor = NodeIpcProcessor() + def create_transport(config: TransportConfig, cwd: Optional[str], callback_object: TransportCallbacks) -> Transport[Dict[str, Any]]: @@ -264,14 +258,14 @@ def create_transport(config: TransportConfig, cwd: Optional[str], else: stdout = subprocess.DEVNULL stdin = subprocess.DEVNULL - elif not config.node_ipc: - stdout = subprocess.PIPE - stdin = subprocess.PIPE - else: + elif config.node_ipc: stdout = subprocess.PIPE stdin = subprocess.DEVNULL stderr = subprocess.STDOUT - pass_fds = (config.node_ipc.child_conn.fileno(),) + pass_fds = (config.node_ipc.child_connection.fileno(),) + else: + stdout = subprocess.PIPE + stdin = subprocess.PIPE startupinfo = _fixup_startup_args(config.command) sock = None # type: Optional[socket.socket] @@ -288,7 +282,6 @@ def start_subprocess() -> subprocess.Popen: config.listener_socket, start_subprocess ) - processor = StandardProcessor(reader, writer) # type: AbstractProcessor else: process = start_subprocess() if config.tcp_port: @@ -296,19 +289,24 @@ def start_subprocess() -> subprocess.Popen: if sock is None: raise RuntimeError("Failed to connect on port {}".format(config.tcp_port)) reader = writer = sock.makefile('rwb') - processor = StandardProcessor(reader, writer) - elif not config.node_ipc: + elif config.node_ipc: + reader = writer = config.node_ipc.parent_connection + else: if not process.stdout or not process.stdin: raise RuntimeError( 'Failed initializing transport: reader: {}, writer: {}' .format(process.stdout, process.stdin) ) - processor = StandardProcessor(process.stdout, process.stdin) - else: - processor = NodeIpcProcessor(config.node_ipc.parent_conn) - + reader = process.stdout + writer = process.stdin stderr_reader = process.stdout if config.node_ipc else process.stderr - return ProcessTransport(config.name, process, sock, stderr_reader, processor, callback_object) + processor = node_ipc_processor if config.node_ipc else standard_processor + + if not reader or not writer: + raise RuntimeError('Failed initializing transport: reader: {}, writer: {}'.format(reader, writer)) + + return ProcessTransport(config.name, process, sock, reader, writer, stderr_reader, processor, + callback_object) _subprocesses = weakref.WeakSet() # type: weakref.WeakSet[subprocess.Popen] diff --git a/plugin/core/types.py b/plugin/core/types.py index 9ebb810bd..0816e3ca8 100644 --- a/plugin/core/types.py +++ b/plugin/core/types.py @@ -607,7 +607,11 @@ def map_from_remote_to_local(self, uri: str) -> Tuple[str, bool]: return _translate_path(uri, self._remote, self._local) -NodeIpcPipe = collections.namedtuple('NodeIpcPipe', 'parent_conn,child_conn') +class NodeIpcPipe(): + def __init__(self) -> None: + parent_connection, child_connection = multiprocessing.Pipe() + self.parent_connection = parent_connection + self.child_connection = child_connection class TransportConfig: @@ -804,8 +808,8 @@ def resolve_transport_config(self, variables: Dict[str, str]) -> TransportConfig env[key] = sublime.expand_variables(value, variables) node_ipc = None if self.use_node_ipc: - node_ipc = NodeIpcPipe(*multiprocessing.Pipe()) - env["NODE_CHANNEL_FD"] = str(node_ipc.child_conn.fileno()) + node_ipc = NodeIpcPipe() + env["NODE_CHANNEL_FD"] = str(node_ipc.child_connection.fileno()) return TransportConfig(self.name, command, tcp_port, env, listener_socket, node_ipc) def set_view_status(self, view: sublime.View, message: str) -> None: From 47fde0d224b3a6e49faf9a92812f6369a849468b Mon Sep 17 00:00:00 2001 From: Alec Mev Date: Wed, 10 Aug 2022 14:25:50 +0300 Subject: [PATCH 05/10] Fix mypy and flake8 --- plugin/core/transports.py | 9 +++++---- plugin/core/types.py | 1 - 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/plugin/core/transports.py b/plugin/core/transports.py index 52bc58cf1..01564166b 100644 --- a/plugin/core/transports.py +++ b/plugin/core/transports.py @@ -49,10 +49,10 @@ def on_stderr_message(self, message: str) -> None: class AbstractProcessor(Generic[T]): - def write_data(self, writer: IO[bytes], data: T) -> None: + def write_data(self, writer: Any, data: T) -> None: raise NotImplementedError() - def read_data(self, reader: IO[bytes]) -> Optional[T]: + def read_data(self, reader: Any) -> Optional[T]: raise NotImplementedError() @@ -242,6 +242,7 @@ def _stderr_loop(self) -> None: exception_log('unexpected exception type in stderr loop', ex) self._send_queue.put_nowait(None) + # Can be a singleton since it doesn't hold any state. standard_processor = StandardProcessor() node_ipc_processor = NodeIpcProcessor() @@ -290,7 +291,7 @@ def start_subprocess() -> subprocess.Popen: raise RuntimeError("Failed to connect on port {}".format(config.tcp_port)) reader = writer = sock.makefile('rwb') elif config.node_ipc: - reader = writer = config.node_ipc.parent_connection + reader = writer = config.node_ipc.parent_connection # type: ignore else: if not process.stdout or not process.stdin: raise RuntimeError( @@ -300,7 +301,7 @@ def start_subprocess() -> subprocess.Popen: reader = process.stdout writer = process.stdin stderr_reader = process.stdout if config.node_ipc else process.stderr - processor = node_ipc_processor if config.node_ipc else standard_processor + processor = node_ipc_processor if config.node_ipc else standard_processor if not reader or not writer: raise RuntimeError('Failed initializing transport: reader: {}, writer: {}'.format(reader, writer)) diff --git a/plugin/core/types.py b/plugin/core/types.py index 0816e3ca8..95700eaca 100644 --- a/plugin/core/types.py +++ b/plugin/core/types.py @@ -10,7 +10,6 @@ from wcmatch.glob import BRACE from wcmatch.glob import globmatch from wcmatch.glob import GLOBSTAR -import collections import contextlib import fnmatch import multiprocessing From 93157e7b7ddf878438a87ca3af6ed89348bb6f75 Mon Sep 17 00:00:00 2001 From: Alec Mev Date: Wed, 10 Aug 2022 15:02:34 +0300 Subject: [PATCH 06/10] Shorten use_node_ipc description --- sublime-package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sublime-package.json b/sublime-package.json index ed3b75773..5f3bb5a56 100644 --- a/sublime-package.json +++ b/sublime-package.json @@ -78,7 +78,7 @@ "ClientUseNodeIpc": { "type": "boolean", "default": false, - "markdownDescription": "Communicate with the language server over Node.js IPC. This lets the server print to stdout without disrupting the LSP communication. It's non-standard, but is used by VSCode. The command must be adjusted accordingly, e.g. `--stdio` must be replaced with `--node-ipc` in case of vscode-eslint. `tcp_port` is ignored if this is enabled." + "markdownDescription": "Communicate with the language server over Node.js IPC. The command must be adjusted accordingly, e.g. `--stdio` must be replaced with `--node-ipc` for some servers." }, "ClientEnabled": { "type": "boolean", From e6ae057dba05c8206bd6ab4d64c10ee00bb2fd1a Mon Sep 17 00:00:00 2001 From: Alec Mev Date: Sat, 13 Aug 2022 10:58:50 +0300 Subject: [PATCH 07/10] Fix Windows --- plugin/core/transports.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/plugin/core/transports.py b/plugin/core/transports.py index 01564166b..cc14903a2 100644 --- a/plugin/core/transports.py +++ b/plugin/core/transports.py @@ -13,6 +13,7 @@ import socket import sublime import subprocess +import sys import threading import time import weakref @@ -251,7 +252,8 @@ def _stderr_loop(self) -> None: def create_transport(config: TransportConfig, cwd: Optional[str], callback_object: TransportCallbacks) -> Transport[Dict[str, Any]]: stderr = subprocess.PIPE - pass_fds = () # type: Union[Tuple[()], Tuple[int]] + # https://github.com/python/cpython/blob/17bf6b4671ec02d80ad29b278639d5307baddeb5/Lib/subprocess.py#L706 + close_fds = True if sys.version_info >= (3, 8, 0) else subprocess._PLATFORM_DEFAULT_CLOSE_FDS # type: ignore if config.tcp_port is not None: assert config.tcp_port is not None if config.tcp_port < 0: @@ -263,7 +265,7 @@ def create_transport(config: TransportConfig, cwd: Optional[str], stdout = subprocess.PIPE stdin = subprocess.DEVNULL stderr = subprocess.STDOUT - pass_fds = (config.node_ipc.child_connection.fileno(),) + close_fds = False else: stdout = subprocess.PIPE stdin = subprocess.PIPE @@ -273,7 +275,7 @@ def create_transport(config: TransportConfig, cwd: Optional[str], process = None # type: Optional[subprocess.Popen] def start_subprocess() -> subprocess.Popen: - return _start_subprocess(config.command, stdin, stdout, stderr, startupinfo, config.env, cwd, pass_fds) + return _start_subprocess(config.command, stdin, stdout, stderr, startupinfo, config.env, cwd, close_fds) if config.listener_socket: assert isinstance(config.tcp_port, int) and config.tcp_port > 0 @@ -356,7 +358,7 @@ def _start_subprocess( startupinfo: Any, env: Dict[str, str], cwd: Optional[str], - pass_fds: Union[Tuple[()], Tuple[int]] + close_fds: bool ) -> subprocess.Popen: debug("starting {} in {}".format(args, cwd if cwd else os.getcwd())) process = subprocess.Popen( @@ -367,7 +369,7 @@ def _start_subprocess( startupinfo=startupinfo, env=env, cwd=cwd, - pass_fds=pass_fds) + close_fds=close_fds) _subprocesses.add(process) return process From 3dc8daa1eef1d56af7b981951f85bddae5e07ebf Mon Sep 17 00:00:00 2001 From: Raoul Wols Date: Mon, 1 Jul 2024 20:29:14 +0200 Subject: [PATCH 08/10] Make duplex pipes ('node IPC') work for linux --- plugin/core/sessions.py | 6 +- plugin/core/transports.py | 759 ++++++++++++++++++++++++++------------ plugin/core/types.py | 149 ++++---- plugin/core/windows.py | 7 +- plugin/tooling.py | 11 +- pyproject.toml | 4 + 6 files changed, 607 insertions(+), 329 deletions(-) create mode 100644 pyproject.toml diff --git a/plugin/core/sessions.py b/plugin/core/sessions.py index 8d06245f9..1ab8d6a68 100644 --- a/plugin/core/sessions.py +++ b/plugin/core/sessions.py @@ -86,7 +86,7 @@ from .settings import client_configs from .settings import globalprefs from .settings import userprefs -from .transports import Transport +from .transports import TransportWrapper from .transports import TransportCallbacks from .types import Capabilities from .types import ClientConfig @@ -1255,7 +1255,7 @@ class Session(TransportCallbacks): def __init__(self, manager: Manager, logger: Logger, workspace_folders: list[WorkspaceFolder], config: ClientConfig, plugin_class: type[AbstractPlugin] | None) -> None: - self.transport: Transport | None = None + self.transport: TransportWrapper | None = None self.working_directory: str | None = None self.request_id = 0 # Our request IDs are always integers. self._logger = logger @@ -1511,7 +1511,7 @@ def initialize_async( self, variables: dict[str, str], working_directory: str | None, - transport: Transport, + transport: TransportWrapper, init_callback: InitCallback ) -> None: self.transport = transport diff --git a/plugin/core/transports.py b/plugin/core/transports.py index 8e02cbffd..f50fa61b7 100644 --- a/plugin/core/transports.py +++ b/plugin/core/transports.py @@ -1,28 +1,59 @@ from __future__ import annotations from .logging import exception_log, debug -from .types import TCP_CONNECT_TIMEOUT -from .types import TransportConfig +from abc import abstractmethod from contextlib import closing from functools import partial from queue import Queue -from typing import Any, Callable, Dict, Generic, IO, Optional, Protocol, TypeVar +from typing import Any, Callable, Generic, IO, Protocol, Sequence, TypeVar import http.client import http +import io import json -import multiprocessing.connection import os import shutil import socket import sublime import subprocess -import sys +import contextlib import threading import time import weakref +import ssl -T = TypeVar('T') -T_contra = TypeVar('T_contra', contravariant=True) +TCP_CONNECT_TIMEOUT = 5 # seconds +T = TypeVar("T") +T_contra = TypeVar("T_contra", contravariant=True) + + +def _set_inheritable(inherit_file_descriptors: Sequence[int] | None, value: bool) -> None: + if inherit_file_descriptors and sublime.platform() == "windows": + for file_descriptor in inherit_file_descriptors: + os.set_handle_inheritable(file_descriptor, value) # type: ignore + + +class LaunchConfig: + __slots__ = ("command", "env") + + def __init__(self, command: list[str], env: dict[str, str] = {}) -> None: + self.command: list[str] = command + self.env: dict[str, str] = env + + def start( + self, + cwd: str | None, + stdout: int, + stdin: int, + stderr: int, + inherit_file_descriptors: Sequence[int] | None = None, + ) -> subprocess.Popen: + startupinfo = _fixup_startup_args(self.command, inherit_file_descriptors) + _set_inheritable(inherit_file_descriptors, True) + pass_fds = inherit_file_descriptors if inherit_file_descriptors and sublime.platform() != "windows" else tuple() + try: + return _start_subprocess(self.command, stdout, stdin, stderr, startupinfo, self.env, cwd, pass_fds) + finally: + _set_inheritable(inherit_file_descriptors, False) class StopLoopError(Exception): @@ -30,8 +61,15 @@ class StopLoopError(Exception): class Transport(Generic[T]): + def __init__(self, encoder: Callable[[T], bytes], decoder: Callable[[bytes], T], http_headers: bool) -> None: + self._encoder = encoder + self._decoder = decoder + self._http_headers = http_headers - def send(self, payload: T) -> None: + def read(self) -> T: + raise NotImplementedError() + + def write(self, payload: T) -> None: raise NotImplementedError() def close(self) -> None: @@ -39,113 +77,85 @@ def close(self) -> None: class TransportCallbacks(Protocol[T_contra]): + def on_transport_close(self, exit_code: int, exception: Exception | None) -> None: ... - def on_transport_close(self, exit_code: int, exception: Exception | None) -> None: - ... + def on_payload(self, payload: T_contra) -> None: ... - def on_payload(self, payload: T_contra) -> None: - ... + def on_stderr_message(self, message: str) -> None: ... - def on_stderr_message(self, message: str) -> None: - ... +def _join_thread(t: threading.Thread) -> None: + if t.ident == threading.current_thread().ident: + return + try: + t.join(2) + except TimeoutError as ex: + exception_log(f"failed to join {t.name} thread", ex) -class AbstractProcessor(Generic[T]): - - def write_data(self, writer: Any, data: T) -> None: - raise NotImplementedError() - - def read_data(self, reader: IO[bytes]) -> T | None: - raise NotImplementedError() +class ErrorReader(Generic[T]): + """ + Responsible for relaying log messages from a raw stream to a (subclass of) TransportCallbacks. Because the various + transport configurations want to listen to different streams, perhaps completely separate from the regular RPC + transport, this is wrapped in a different class. For instance, a TCP client transport communicating via a socket, + while it listens for log messages on the stdout/stderr streams of a spawned child process. + """ -class JsonRpcProcessor(AbstractProcessor[Dict[str, Any]]): + def __init__(self, callback_object: TransportCallbacks[T], reader: IO[bytes]) -> None: + self._callback_object = weakref.ref(callback_object) + self._reader = reader + self._thread = threading.Thread(target=self._loop) + self._thread.start() - def write_data(self, writer: IO[bytes], data: dict[str, Any]) -> None: - body = self._encode(data) - writer.writelines((f"Content-Length: {len(body)}\r\n\r\n".encode('ascii'), body)) + def __del__(self) -> None: + _join_thread(self._thread) - def read_data(self, reader: IO[bytes]) -> dict[str, Any] | None: - headers = http.client.parse_headers(reader) # type: ignore + def _loop(self) -> None: try: - body = reader.read(int(headers.get("Content-Length"))) - except TypeError: - if str(headers) == '\n': - # Expected on process stopping. Gracefully stop the transport. - raise StopLoopError() - else: - # Propagate server's output to the UI. - raise Exception(f"Unexpected payload in server's stdout:\n\n{headers}") - try: - return self._decode(body) + while self._reader: + message = self._reader.readline().decode("utf-8", "replace") + if message == "": + continue + callback_object = self._callback_object() + if callback_object: + callback_object.on_stderr_message(message.rstrip()) + else: + break + except (BrokenPipeError, AttributeError): + pass except Exception as ex: - raise Exception(f"JSON decode error: {ex}") - - @staticmethod - def _encode(data: dict[str, Any]) -> bytes: - return json.dumps( - data, - ensure_ascii=False, - sort_keys=False, - check_circular=False, - separators=(',', ':') - ).encode('utf-8') - - @staticmethod - def _decode(message: bytes) -> dict[str, Any]: - return json.loads(message.decode('utf-8')) - - -class NodeIpcProcessor(AbstractProcessor[Dict[str, Any]]): - _buf: bytearray - _lines: int - - def __init__(self) -> None: - self._buf = bytearray() - self._lines = 0 - - def write_data(self, writer: multiprocessing.connection._ConnectionBase, data: dict[str, Any]) -> None: - body = JsonRpcProcessor._encode(data) + b"\n" - while len(body): - n = writer._write(writer.fileno(), body) # type: ignore - body = body[n:] - - def read_data(self, reader: multiprocessing.connection._ConnectionBase) -> dict[str, Any] | None: - while self._lines == 0: - chunk = reader._read(reader.fileno(), 65536) # type: ignore - if len(chunk) == 0: - # EOF reached: https://docs.python.org/3/library/os.html#os.read - raise StopLoopError() - - self._buf += chunk - self._lines += chunk.count(b'\n') - - self._lines -= 1 - message, _, self._buf = self._buf.partition(b'\n') - return JsonRpcProcessor._decode(message) - - -class ProcessTransport(Transport[T]): - - def __init__(self, name: str, process: subprocess.Popen | None, socket: socket.socket | None, - reader: IO[bytes], writer: IO[bytes], stderr: IO[bytes] | None, - processor: AbstractProcessor[T], callback_object: TransportCallbacks[T]) -> None: + exception_log("unexpected exception type in error reader", ex) + + +class TransportWrapper(Generic[T]): + """ + Double dispatch-like class that takes a (subclass of) Transport, and provides to a (subclass of) TransportCallbacks + appropriately decoded messages of type T. The TransportWrapper is also responsible for keeping the spawned child + process around (if any), and also keeps track of the ErrorReader. It can be the case that there is no ErrorReader, + for instance when talking to a remote TCP language server. So it can be None. + """ + + def __init__( + self, + callback_object: TransportCallbacks[T], + transport: Transport[T], + process: subprocess.Popen | None, + error_reader: ErrorReader | None, + ) -> None: self._closed = False - self._process = process - self._socket = socket - self._reader = reader - self._writer = writer - self._stderr = stderr - self._processor = processor - self._reader_thread = threading.Thread(target=self._read_loop, name=f'{name}-reader') - self._writer_thread = threading.Thread(target=self._write_loop, name=f'{name}-writer') self._callback_object = weakref.ref(callback_object) + self._transport = transport + self._process = process + self._error_reader = error_reader + self._reader_thread = threading.Thread(target=self._read_loop) + self._writer_thread = threading.Thread(target=self._write_loop) self._send_queue: Queue[T | None] = Queue(0) self._reader_thread.start() self._writer_thread.start() - if stderr: - self._stderr_thread = threading.Thread(target=self._stderr_loop, name=f'{name}-stderr') - self._stderr_thread.start() + + @property + def process_args(self) -> Any: + return self._process.args if self._process else None def send(self, payload: T) -> None: self._send_queue.put_nowait(payload) @@ -153,30 +163,19 @@ def send(self, payload: T) -> None: def close(self) -> None: if not self._closed: self._send_queue.put_nowait(None) - if self._socket: - self._socket.close() + self._transport.close() self._closed = True - def _join_thread(self, t: threading.Thread) -> None: - if t.ident == threading.current_thread().ident: - return - try: - t.join(2) - except TimeoutError as ex: - exception_log(f"failed to join {t.name} thread", ex) - def __del__(self) -> None: self.close() - self._join_thread(self._writer_thread) - self._join_thread(self._reader_thread) - if self._stderr_thread: - self._join_thread(self._stderr_thread) + _join_thread(self._writer_thread) + _join_thread(self._reader_thread) def _read_loop(self) -> None: exception = None try: - while self._reader: - payload = self._processor.read_data(self._reader) + while True: + payload = self._transport.read() if payload is None: continue @@ -229,101 +228,417 @@ def invoke() -> None: def _write_loop(self) -> None: exception: Exception | None = None try: - while self._writer: + while True: d = self._send_queue.get() if d is None: break - self._processor.write_data(self._writer, d) + self._transport.write(d) except (BrokenPipeError, AttributeError): pass except Exception as ex: exception = ex self._end(exception) - def _stderr_loop(self) -> None: + +def encode_json(data: dict[str, Any]) -> bytes: + return json.dumps( + data, + ensure_ascii=False, + sort_keys=False, + check_circular=False, + separators=(",", ":"), + ).encode("utf-8") + + +def decode_json(message: bytes) -> dict[str, Any]: + return json.loads(message.decode("utf-8")) + + +class FileObjectTransport(Transport[T]): + def __init__( + self, + encoder: Callable[[T], bytes], + decoder: Callable[[bytes], T], + http_headers: bool, + reader: io.BufferedIOBase, + writer: io.BufferedIOBase, + ) -> None: + super().__init__(encoder, decoder, http_headers) + self._reader = reader + self._writer = writer + + def read(self) -> T: + headers: http.client.HTTPMessage | None = None try: - while self._stderr: - if self._closed: - # None message already posted, just return - return - message = self._stderr.readline().decode('utf-8', 'replace') - if message == '': - continue - callback_object = self._callback_object() - if callback_object: - callback_object.on_stderr_message(message.rstrip()) - else: - break - except (BrokenPipeError, AttributeError): - pass + if self._http_headers: + headers = http.client.parse_headers(self._reader) + content_length = headers.get("Content-Length") + if not isinstance(content_length, str): + raise Exception("missing Content-Length header") + body = self._reader.read(int(content_length)) + else: + body = self._reader.readline() + if not body or body == b"\n": + raise StopLoopError() + except TypeError: + if str(headers) == "\n": + # Expected on process stopping. Gracefully stop the transport. + raise StopLoopError() + else: + # Propagate server's output to the UI. + raise Exception(f"Unexpected payload in server's stdout:\n\n{headers}") + try: + return self._decoder(body) except Exception as ex: - exception_log('unexpected exception type in stderr loop', ex) - self._send_queue.put_nowait(None) + raise Exception(f"JSON decode error: {ex}") + + def write(self, payload: T) -> None: + body = self._encoder(payload) + if self._http_headers: + self._writer.writelines((f"Content-Length: {len(body)}\r\n\r\n".encode("ascii"), body)) + else: + self._writer.writelines((body, b"\n")) + self._writer.flush() + def close(self) -> None: + self._writer.close() + self._reader.close() -# Can be a singleton since it doesn't hold any state. -standard_processor = JsonRpcProcessor() -node_ipc_processor = NodeIpcProcessor() +class SocketTransport(FileObjectTransport[T]): + def __init__( + self, encoder: Callable[[T], bytes], decoder: Callable[[bytes], T], http_headers: bool, sock: socket.socket + ) -> None: + reader_writer_pair: io.BufferedRWPair = sock.makefile("rwb") + super().__init__(encoder, decoder, http_headers, reader_writer_pair, reader_writer_pair) + self._socket = sock -def create_transport(config: TransportConfig, cwd: str | None, - callback_object: TransportCallbacks) -> Transport[dict[str, Any]]: - close_fds = True - if config.tcp_port is not None: - if config.tcp_port < 0: - stdout = subprocess.PIPE + def close(self) -> None: + super().close() + self._socket.close() + + +class WebSocketTransport(Transport[T]): + def read(self) -> T: + raise NotImplementedError() + + def write(self, payload: T) -> None: + raise NotImplementedError() + + def close(self) -> None: + raise NotImplementedError() + + +class DuplexPipeTransport(SocketTransport[T]): + def __init__( + self, + encoder: Callable[[T], bytes], + decoder: Callable[[bytes], T], + http_headers: bool, + sock1: socket.socket, + sock2: socket.socket, + ) -> None: + super().__init__(encoder, decoder, http_headers, sock2) + self._sock1 = sock1 + + def close(self) -> None: + super().close() + self._sock1.close() + + +class TransportConfig: + """ + Responsible for instantiating a TransportWrapper, which is the object that does the actual RPC communication. + """ + + __slots__ = ("_http_headers",) + + def __init__(self, http_headers: bool = True) -> None: + self._http_headers = http_headers + + @property + def http_headers(self) -> bool: + return self._http_headers + + def requires_launch_config(self) -> bool: + return False + + def _resolve_launch_config( + self, + command: list[str], + env: dict[str, str | list[str]] | None, + variables: dict[str, str], + ) -> LaunchConfig: + command = sublime.expand_variables(command, variables) + command = [os.path.expanduser(arg) for arg in command] + resolved_env = os.environ.copy() + for key, value in env.items() if isinstance(env, dict) else {}: + if isinstance(value, list): + value = os.path.pathsep.join(value) + if key == "PATH": + resolved_env[key] = sublime.expand_variables(value, variables) + os.path.pathsep + resolved_env[key] + else: + resolved_env[key] = sublime.expand_variables(value, variables) + return LaunchConfig(command, resolved_env) + + @abstractmethod + def start( + self, + command: list[str] | None, + env: dict[str, str | list[str]] | None, + cwd: str | None, + variables: dict[str, str], + callbacks: TransportCallbacks[dict[str, Any]], + ) -> TransportWrapper[dict[str, Any]]: + raise NotImplementedError() + + +class StdioTransportConfig(TransportConfig): + """ + The simplest of transport configs: launch the subprocess and communicate with it over standard I/O. This transport + config requires a "command". This is the default transport config when only a "command" is specified in the + ClientConfig. + """ + + __slots__ = () + + def __init__(self, http_headers: bool) -> None: + super().__init__(http_headers) + + def start( + self, + command: list[str] | None, + env: dict[str, str | list[str]] | None, + cwd: str | None, + variables: dict[str, str], + callbacks: TransportCallbacks[dict[str, Any]], + ) -> TransportWrapper: + if not command: + raise RuntimeError('missing "command" to start a child process for running the language server') + process = self._resolve_launch_config(command, env, variables).start( + cwd, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + return TransportWrapper( + callback_object=callbacks, + transport=FileObjectTransport(encode_json, decode_json, self.http_headers, process.stdout, process.stdin), # type: ignore # noqa: E501 + process=process, + error_reader=ErrorReader(callbacks, process.stderr), # type: ignore + ) + + +class TcpClientTransportConfig(TransportConfig): + """ + Transport for communicating to a language server that expects incoming client connections. The language server acts + as the TCP server, this text editor acts as the TCP client. One can have a "command" with this transport + configuration. In that case the subprocess is launched, and then the TCP connection is attempted. If no "command" is + given, a TCP connection is still made. This can be used for cases where the language server is already running as + part of some larger application, like Godot Editor. + """ + + __slots__ = ("_hostname", "_port") + + def __init__(self, http_headers: bool, hostname: str | None, port: int | None) -> None: + super().__init__(http_headers) + self._hostname = hostname + self._port = port + if isinstance(self._port, int) and self._port <= 0: + raise RuntimeError("invalid port number") + + def _connect(self, port: int) -> socket.socket: + start_time = time.time() + last_exception: Exception | None = None + while time.time() - start_time < TCP_CONNECT_TIMEOUT: + try: + return socket.create_connection((self._hostname or "", port)) + except Exception as ex: + last_exception = ex + if last_exception: + raise last_exception else: - stdout = subprocess.DEVNULL - stdout = subprocess.PIPE - stdin = subprocess.DEVNULL - stderr = subprocess.PIPE - elif config.node_ipc: - stdout = subprocess.PIPE - stdin = subprocess.DEVNULL - stderr = subprocess.STDOUT - close_fds = False - else: - stdout = subprocess.PIPE - stdin = subprocess.PIPE - stderr = subprocess.PIPE - sock: socket.socket | None = None - process: subprocess.Popen | None = None - - def start_subprocess() -> subprocess.Popen: - startupinfo = _fixup_startup_args(config.command) - return _start_subprocess(config.command, stdin, stdout, stderr, startupinfo, config.env, cwd, close_fds) - - if config.listener_socket: - assert isinstance(config.tcp_port, int) and config.tcp_port > 0 - if config.command: - process, sock, reader, writer = _start_subprocess_and_await_connection( - config.listener_socket, start_subprocess + raise RuntimeError("failed to connect") + + def start( + self, + command: list[str] | None, + env: dict[str, str | list[str]] | None, + cwd: str | None, + variables: dict[str, str], + callbacks: TransportCallbacks[dict[str, Any]], + ) -> TransportWrapper: + port = _add_and_resolve_port_variable(variables, self._port) + if command: + process = self._resolve_launch_config(command, env, variables).start( + cwd, + stdout=subprocess.PIPE, + stdin=subprocess.DEVNULL, + stderr=subprocess.STDOUT, ) + error_reader: ErrorReader | None = ErrorReader(callbacks, process.stdout) # type: ignore else: - sock, reader, writer = _await_client_connection(config.listener_socket) - else: - if config.command: - process = start_subprocess() - elif not config.tcp_port: - raise RuntimeError("Failed to provide command or tcp_port, at least one of them has to be configured") - if config.tcp_port: - sock = _connect_tcp(config.tcp_port) - if sock is None: - raise RuntimeError(f"Failed to connect on port {config.tcp_port}") - reader = sock.makefile('rwb') # type: ignore - writer = reader - elif config.node_ipc: - reader = writer = config.node_ipc.parent_connection # type: ignore - else: - reader = process.stdout # type: ignore - writer = process.stdin # type: ignore - stderr_reader = process.stdout if config.node_ipc else process.stderr - processor = node_ipc_processor if config.node_ipc else standard_processor - if not reader or not writer: - raise RuntimeError(f'Failed initializing transport: reader: {reader}, writer: {writer}') - stderr = process.stderr if process else None - return ProcessTransport( - config.name, process, sock, reader, writer, stderr, processor, callback_object) # type: ignore + process = None + error_reader = None + return TransportWrapper( + callback_object=callbacks, + transport=SocketTransport(encode_json, decode_json, self.http_headers, self._connect(port)), + process=process, + error_reader=error_reader, + ) + + +class TcpServerTransportConfig(TransportConfig): + """ + Transport for communicating to a language server over TCP. The difference, however, is that this transport will + start a TCP listener socket accepting new TCP cliet connections. Once a client connects to this text editor acting + as the TCP server, we'll assume it's the language server we just launched. As such, this tranport requires a + "command" for starting the language server subprocess. + """ + + __slots__ = ("_port",) + + def __init__(self, http_headers: bool, port: int | None = None) -> None: + super().__init__(http_headers) + self._port = port + + def start( + self, + command: list[str] | None, + env: dict[str, str | list[str]] | None, + cwd: str | None, + variables: dict[str, str], + callbacks: TransportCallbacks[dict[str, Any]], + ) -> TransportWrapper: + if not command: + raise RuntimeError('missing "command" to start a child process for running the language server') + port = _add_and_resolve_port_variable(variables, self._port) + launch = self._resolve_launch_config(command, env, variables) + listener_socket = socket.socket() + listener_socket.bind(("", port)) + listener_socket.settimeout(TCP_CONNECT_TIMEOUT) + listener_socket.listen(TCP_CONNECT_TIMEOUT) + process: subprocess.Popen | None = None + + # We need to be able to start the process while also awaiting a client connection. + def start_in_background() -> None: + nonlocal process + # Sleep for one second, because the listener socket needs to be in the "accept" state before starting the + # subprocess. This is hacky, and will get better when we can use asyncio. + time.sleep(1) + process = launch.start( + cwd, + stdout=subprocess.PIPE, + stdin=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + ) + + thread = threading.Thread(target=start_in_background) + thread.start() + with closing(listener_socket): + # Await one client connection (blocking!) + sock, _ = listener_socket.accept() + thread.join() + + error_reader = ErrorReader(callbacks, process.stdout) # type: ignore + return TransportWrapper( + callback_object=callbacks, + transport=SocketTransport(encode_json, decode_json, self.http_headers, sock), + process=process, + error_reader=error_reader, + ) + + +class TlsClientTransportConfig(TcpClientTransportConfig): + """ + Exactly like the TCP client transport, except we wrap the communication in secure TLS/SSL. + """ + + __slots__ = () + + def _connect(self, port: int) -> socket.socket: + # TODO: Check if a call to ssl.create_default_context() is needed here. + return ssl.wrap_socket(super()._connect(port)) + + +class WebSocketClientTransportConfig(TransportConfig): + """ + Transport configuration for connecting, as an HTTP(S) client, to an HTTP(S) server. The HTTP(S) server is expected + to make the WebSocket upgrade negotiation, after which we upgrade to WebSocket and will then start talking the LSP + protocol. This transport can have a "command", in which case we start the subprocess using the provided "command", + and then start the websocket connection. + """ + + __slots__ = ("_hostname", "_port", "_secure") + + def __init__( + self, + http_headers: bool, + hostname: str | None, + port: int | None, + secure: bool = False, + ) -> None: + super().__init__(http_headers) + self._hostname = hostname + self._port = port + self._secure = secure + + @property + def port(self) -> int: + if isinstance(self._port, int): + return self._port + return http.client.HTTPS_PORT if self._secure else http.client.HTTP_PORT + + +class DuplexPipeTransportConfig(TransportConfig): + """ + Transport configuration for communicating with a process using a "duplex pipe" construction. The spawned subprocess + is informed of the pipe's file descriptor with an environment variable. The pipe file descriptor handle is inherited + by the child process. + + On Linux and macOS, this is implemented using AF_UNIX socketpairs: + https://www.man7.org/linux/man-pages/man7/unix.7.html + + !!! TODO !!! + On Windows, this is implemented using NamedPipes: https://learn.microsoft.com/en-us/windows/win32/ipc/named-pipes + """ + + __slots__ = ("_child_fileno_env_key",) + + def __init__(self, http_headers: bool, child_fileno_env_key: str) -> None: + super().__init__(http_headers) + self._child_fileno_env_key = child_fileno_env_key + + def start( + self, + command: list[str] | None, + env: dict[str, str | list[str]] | None, + cwd: str | None, + variables: dict[str, str], + callbacks: TransportCallbacks[dict[str, Any]], + ) -> TransportWrapper: + if not command: + raise RuntimeError('missing "command" to start a child process for running the language server') + if env is None: + env = {} + # !!! TODO !!! windows named pipes + sock1, sock2 = socket.socketpair() + sock1.set_inheritable(True) + env[self._child_fileno_env_key] = str(sock1.fileno()) + process = self._resolve_launch_config(command, env, variables).start( + cwd, + stdout=subprocess.PIPE, + stdin=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + inherit_file_descriptors=(sock1.fileno(),), + ) + error_reader = ErrorReader(callbacks, process.stdout) # type: ignore + return TransportWrapper( + callback_object=callbacks, + transport=DuplexPipeTransport(encode_json, decode_json, self.http_headers, sock1, sock2), + process=process, + error_reader=error_reader, + ) _subprocesses: weakref.WeakSet[subprocess.Popen] = weakref.WeakSet() @@ -344,10 +659,12 @@ def kill_all_subprocesses() -> None: pass -def _fixup_startup_args(args: list[str]) -> Any: +def _fixup_startup_args(args: list[str], inherit_file_descriptors: Sequence[int] | None = None) -> Any: startupinfo = None if sublime.platform() == "windows": startupinfo = subprocess.STARTUPINFO() # type: ignore + if inherit_file_descriptors: + startupinfo.lpAttributeList = {"handle_list": inherit_file_descriptors} startupinfo.dwFlags |= subprocess.SW_HIDE | subprocess.STARTF_USESHOWWINDOW # type: ignore executable_arg = args[0] _, ext = os.path.splitext(executable_arg) @@ -357,7 +674,7 @@ def _fixup_startup_args(args: list[str]) -> Any: # node has .cmd # dart has .bat # python has .exe wrappers - not needed - for extension in ['.cmd', '.bat']: + for extension in [".cmd", ".bat"]: if path_to_executable and path_to_executable.lower().endswith(extension): args[0] = executable_arg + extension break @@ -372,9 +689,11 @@ def _start_subprocess( startupinfo: Any, env: dict[str, str], cwd: str | None, - close_fds: bool + pass_fds: Sequence[int], ) -> subprocess.Popen: debug(f"starting {args} in {cwd if cwd else os.getcwd()}") + if pass_fds: + debug(f"inheriting file descriptors: {pass_fds}") process = subprocess.Popen( args=args, stdin=stdin, @@ -383,46 +702,22 @@ def _start_subprocess( startupinfo=startupinfo, env=env, cwd=cwd, - close_fds=close_fds) + pass_fds=pass_fds, + ) + debug("hello world") _subprocesses.add(process) return process -def _await_client_connection(listener_socket: socket.socket) -> tuple[socket.socket, IO[bytes], IO[bytes]]: - with closing(listener_socket): - # Await one client connection (blocking!) - sock, _ = listener_socket.accept() - reader = sock.makefile('rwb') # type: ignore - writer = reader - return sock, reader, writer # type: ignore +def _find_free_port() -> int: + with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + s.bind(("", 0)) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + return s.getsockname()[1] -def _start_subprocess_and_await_connection( - listener_socket: socket.socket, subprocess_starter: Callable[[], subprocess.Popen] -) -> tuple[subprocess.Popen, socket.socket, IO[bytes], IO[bytes]]: - process = None - - # We need to be able to start the process while also awaiting a client connection. - def start_in_background() -> None: - nonlocal process - # Sleep for one second, because the listener socket needs to be in the "accept" state before starting the - # subprocess. This is hacky, and will get better when we can use asyncio. - time.sleep(1) - process = subprocess_starter() - - thread = threading.Thread(target=start_in_background) - thread.start() - sock, reader, writer = _await_client_connection(listener_socket) - thread.join() - assert process is not None - return process, sock, reader, writer # type: ignore - - -def _connect_tcp(port: int) -> socket.socket | None: - start_time = time.time() - while time.time() - start_time < TCP_CONNECT_TIMEOUT: - try: - return socket.create_connection(('localhost', port)) - except ConnectionRefusedError: - pass - return None +def _add_and_resolve_port_variable(variables: dict[str, str], port: int | None) -> int: + if port is None: + port = _find_free_port() + variables["port"] = str(port) + return port diff --git a/plugin/core/types.py b/plugin/core/types.py index 012fa9511..cf202e2ac 100644 --- a/plugin/core/types.py +++ b/plugin/core/types.py @@ -3,6 +3,13 @@ from .file_watcher import FileWatcherEventType from .logging import debug, set_debug_logging from .protocol import TextDocumentSyncKind +from .transports import DuplexPipeTransportConfig +from .transports import StdioTransportConfig +from .transports import TcpClientTransportConfig +from .transports import TcpServerTransportConfig +from .transports import TlsClientTransportConfig +from .transports import TransportConfig +from .transports import WebSocketClientTransportConfig from .url import filename_to_uri from .url import parse_uri from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, TypedDict, TypeVar, Union @@ -12,15 +19,11 @@ from wcmatch.glob import GLOBSTAR import contextlib import fnmatch -import multiprocessing -import os import posixpath -import socket import sublime import time -TCP_CONNECT_TIMEOUT = 5 # seconds FEATURES_TIMEOUT = 300 # milliseconds WORKSPACE_DIAGNOSTICS_TIMEOUT = 3000 # milliseconds @@ -642,35 +645,6 @@ def map_from_remote_to_local(self, uri: str) -> tuple[str, bool]: return _translate_path(uri, self._remote, self._local) -class NodeIpcPipe: - def __init__(self) -> None: - parent_connection, child_connection = multiprocessing.Pipe() - self.parent_connection = parent_connection - self.child_connection = child_connection - - -class TransportConfig: - __slots__ = ("name", "command", "tcp_port", "env", "listener_socket", "node_ipc") - - def __init__( - self, - name: str, - command: list[str], - tcp_port: int | None, - env: dict[str, str], - listener_socket: socket.socket | None, - node_ipc: NodeIpcPipe | None - ) -> None: - if not command and not tcp_port: - raise ValueError('neither "command" nor "tcp_port" is provided; cannot start a language server') - self.name = name - self.command = command - self.tcp_port = tcp_port - self.env = env - self.listener_socket = listener_socket - self.node_ipc = node_ipc - - class ClientConfig: def __init__(self, name: str, @@ -678,9 +652,14 @@ def __init__(self, priority_selector: str | None = None, schemes: list[str] | None = None, command: list[str] | None = None, - use_node_ipc: bool = False, binary_args: list[str] | None = None, # DEPRECATED + hostname: str | None = None, tcp_port: int | None = None, + use_tls: bool | None = None, + websocket: bool | None = None, + http_headers: bool = True, + pipe_fileno_env_key: str | None = None, + use_node_ipc: bool = False, auto_complete_selector: str | None = None, enabled: bool = True, init_options: DottedDict = DottedDict(), @@ -704,8 +683,18 @@ def __init__(self, else: assert isinstance(binary_args, list) self.command = binary_args - self.use_node_ipc = use_node_ipc + self.hostname = hostname self.tcp_port = tcp_port + self.use_tls = use_tls + self.websocket = websocket + # "use_node_ipc" is a convenience bool setting that modifies the http_header and pipe_fileno_env_key settings. + self.use_node_ipc = use_node_ipc + if self.use_node_ipc: + self.http_headers = False + self.pipe_fileno_env_key = "NODE_CHANNEL_FD" + else: + self.http_headers = http_headers + self.pipe_fileno_env_key = pipe_fileno_env_key self.auto_complete_selector = auto_complete_selector self.enabled = enabled self.init_options = init_options @@ -739,8 +728,13 @@ def from_sublime_settings(cls, name: str, s: sublime.Settings, file: str) -> Cli priority_selector=_read_priority_selector(s), schemes=s.get("schemes"), command=read_list_setting(s, "command", []), - use_node_ipc=bool(s.get("use_node_ipc", False)), + hostname=s.get("hostname"), tcp_port=s.get("tcp_port"), + use_tls=bool(s.get("use_tls", False)), + websocket=bool(s.get("websocket", False)), + http_headers=bool(s.get("http_headers", True)), + pipe_fileno_env_key=s.get("pipe_fileno_env_key", None), + use_node_ipc=bool(s.get("use_node_ipc", False)), auto_complete_selector=s.get("auto_complete_selector"), # Default to True, because an LSP plugin is enabled if it is enabled as a Sublime package. enabled=bool(s.get("enabled", True)), @@ -771,8 +765,13 @@ def from_dict(cls, name: str, d: dict[str, Any]) -> ClientConfig: priority_selector=_read_priority_selector(d), schemes=schemes, command=d.get("command", []), - use_node_ipc=d.get("use_node_ipc", False), + hostname=d.get("hostname"), tcp_port=d.get("tcp_port"), + use_tls=bool(d.get("use_tls", False)), + websocket=bool(d.get("websocket", False)), + http_headers=bool(d.get("http_headers", True)), + pipe_fileno_env_key=d.get("pipe_fileno_env_key", False), + use_node_ipc=bool(d.get("use_node_ipc", False)), auto_complete_selector=d.get("auto_complete_selector"), enabled=d.get("enabled", False), init_options=DottedDict(d.get("initializationOptions")), @@ -800,8 +799,12 @@ def from_config(cls, src_config: ClientConfig, override: dict[str, Any]) -> Clie priority_selector=_read_priority_selector(override) or src_config.priority_selector, schemes=override.get("schemes", src_config.schemes), command=override.get("command", src_config.command), - use_node_ipc=override.get("use_node_ipc", src_config.use_node_ipc), + hostname=override.get("hostname", src_config.hostname), tcp_port=override.get("tcp_port", src_config.tcp_port), + use_tls=override.get("use_tls", src_config.use_tls), + websocket=override.get("use_tls", src_config.use_tls), + http_headers=override.get("http_headers", src_config.http_headers), + use_node_ipc=override.get("use_node_ipc", src_config.use_node_ipc), auto_complete_selector=override.get("auto_complete_selector", src_config.auto_complete_selector), enabled=override.get("enabled", src_config.enabled), init_options=DottedDict.from_base_and_override( @@ -817,40 +820,33 @@ def from_config(cls, src_config: ClientConfig, override: dict[str, Any]) -> Clie path_maps=path_map_override if path_map_override else src_config.path_maps ) - def resolve_transport_config(self, variables: dict[str, str]) -> TransportConfig: - tcp_port: int | None = None - listener_socket: socket.socket | None = None + def create_transport_config(self) -> TransportConfig: + """ + Create a (subclass of) TransportConfig that is able to start a TransportWrapper. + """ if self.tcp_port is not None: - # < 0 means we're hosting a TCP server if self.tcp_port < 0: - # -1 means pick any free port - if self.tcp_port < -1: - tcp_port = -self.tcp_port - # Create a listener socket for incoming connections - listener_socket = _start_tcp_listener(tcp_port) - tcp_port = int(listener_socket.getsockname()[1]) + return TcpServerTransportConfig(self.http_headers, None if self.tcp_port == -1 else -self.tcp_port) + elif self.use_tls: + return TlsClientTransportConfig( + self.http_headers, + self.hostname, + None if self.tcp_port == 0 else self.tcp_port, + ) else: - tcp_port = _find_free_port() if self.tcp_port == 0 else self.tcp_port - if tcp_port is not None: - variables["port"] = str(tcp_port) - command = sublime.expand_variables(self.command, variables) - command = [os.path.expanduser(arg) for arg in command] - if tcp_port is not None: - # DEPRECATED -- replace {port} with $port or ${port} in your client config - command = [a.replace('{port}', str(tcp_port)) for a in command] - env = os.environ.copy() - for key, value in self.env.items(): - if isinstance(value, list): - value = os.path.pathsep.join(value) - if key == 'PATH': - env[key] = sublime.expand_variables(value, variables) + os.path.pathsep + env[key] - else: - env[key] = sublime.expand_variables(value, variables) - node_ipc = None - if self.use_node_ipc: - node_ipc = NodeIpcPipe() - env["NODE_CHANNEL_FD"] = str(node_ipc.child_connection.fileno()) - return TransportConfig(self.name, command, tcp_port, env, listener_socket, node_ipc) + return TcpClientTransportConfig( + self.http_headers, + self.hostname, + None if self.tcp_port == 0 else self.tcp_port, + ) + elif self.pipe_fileno_env_key: + return DuplexPipeTransportConfig(self.http_headers, self.pipe_fileno_env_key) + elif self.websocket: + return WebSocketClientTransportConfig( + self.http_headers, self.hostname, self.tcp_port, bool(self.use_tls) + ) + else: + return StdioTransportConfig(self.http_headers) def set_view_status(self, view: sublime.View, message: str) -> None: if sublime.load_settings("LSP.sublime-settings").get("show_view_status"): @@ -1022,18 +1018,3 @@ def _read_priority_selector(config: sublime.Settings | dict[str, Any]) -> str: if language_id: return f"source.{language_id}" return "" - - -def _find_free_port() -> int: - with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: - s.bind(('', 0)) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - return s.getsockname()[1] - - -def _start_tcp_listener(tcp_port: int | None) -> socket.socket: - sock = socket.socket() - sock.bind(('localhost', tcp_port or 0)) - sock.settimeout(TCP_CONNECT_TIMEOUT) - sock.listen(1) - return sock diff --git a/plugin/core/windows.py b/plugin/core/windows.py index 29abdea97..1abdb88a2 100644 --- a/plugin/core/windows.py +++ b/plugin/core/windows.py @@ -24,7 +24,6 @@ from .sessions import Session from .settings import client_configs from .settings import userprefs -from .transports import create_transport from .types import ClientConfig from .types import matches_pattern from .types import sublime_pattern_to_glob @@ -276,8 +275,8 @@ def start_async(self, config: ClientConfig, initiating_view: sublime.View) -> No transport_cwd: str | None = cwd else: transport_cwd = workspace_folders[0].path if workspace_folders else None - transport_config = config.resolve_transport_config(variables) - transport = create_transport(transport_config, transport_cwd, session) + transport = config.create_transport_config().start( + config.command, config.env, transport_cwd, variables, session) if plugin_class: plugin_class.on_post_start(self._window, initiating_view, workspace_folders, config) config.set_view_status(initiating_view, "initialize") @@ -294,7 +293,7 @@ def start_async(self, config: ClientConfig, initiating_view: sublime.View) -> No "Re-enable by running \"LSP: Enable Language Server In Project\" from the Command Palette.", "\n\n--- Error: ---\n{1}" )).format(config.name, str(e)) - exception_log(f"Unable to start subprocess for {config.name}", e) + exception_log(f"Unable to initialize language server for {config.name}", e) if isinstance(e, CalledProcessError): print("Server output:\n{}".format(e.output.decode('utf-8', 'replace'))) self._config_manager.disable_config(config.name, only_for_session=True) diff --git a/plugin/tooling.py b/plugin/tooling.py index acfb42c0b..f4581bcc5 100644 --- a/plugin/tooling.py +++ b/plugin/tooling.py @@ -3,9 +3,8 @@ from .core.logging import debug from .core.registry import windows from .core.sessions import get_plugin -from .core.transports import create_transport -from .core.transports import Transport from .core.transports import TransportCallbacks +from .core.transports import TransportWrapper from .core.types import Capabilities from .core.types import ClientConfig from .core.version import __version__ @@ -495,7 +494,7 @@ def __init__( on_close: Callable[[list[str], str, int], None] ) -> None: self._on_close = on_close - self._transport: Transport | None = None + self._transport: TransportWrapper | None = None self._resolved_command: list[str] = [] self._stderr_lines: list[str] = [] try: @@ -516,9 +515,9 @@ def __init__( cwd = plugin_class.on_pre_start(window, initiating_view, workspace_folders, config) if not cwd and workspace_folders: cwd = workspace_folders[0].path - transport_config = config.resolve_transport_config(variables) - self._resolved_command = transport_config.command - self._transport = create_transport(transport_config, cwd, self) + transport_config = config.create_transport_config() + self._transport = transport_config.start(config.command, config.env, cwd, variables, self) + self._resolved_command = self._transport.process_args sublime.set_timeout_async(self.force_close_transport, self.CLOSE_TIMEOUT_SEC * 1000) except Exception as ex: self.on_transport_close(-1, ex) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 000000000..2190e6121 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,4 @@ +[tool.ruff] +line-length = 120 +indent-width = 4 +target-version = "py38" From 06365a3d061ea939215f49faebb7a309903ab1fa Mon Sep 17 00:00:00 2001 From: Raoul Wols Date: Mon, 1 Jul 2024 20:35:28 +0200 Subject: [PATCH 09/10] Minor fixes --- plugin/core/sessions.py | 7 ++++--- plugin/core/transports.py | 19 ++++++++++--------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/plugin/core/sessions.py b/plugin/core/sessions.py index d8f28e795..569fa55c4 100644 --- a/plugin/core/sessions.py +++ b/plugin/core/sessions.py @@ -86,6 +86,7 @@ from .settings import client_configs from .settings import globalprefs from .settings import userprefs +from .transports import Json from .transports import TransportWrapper from .transports import TransportCallbacks from .types import Capabilities @@ -1259,11 +1260,11 @@ def check_applicable(self, sb: SessionBufferProtocol) -> None: _PARTIAL_RESULT_PROGRESS_PREFIX = "$ublime-partial-result-progress-" -class Session(TransportCallbacks): +class Session(TransportCallbacks[Json]): def __init__(self, manager: Manager, logger: Logger, workspace_folders: list[WorkspaceFolder], config: ClientConfig, plugin_class: type[AbstractPlugin] | None) -> None: - self.transport: TransportWrapper | None = None + self.transport: TransportWrapper[Json] | None = None self.working_directory: str | None = None self.request_id = 0 # Our request IDs are always integers. self._logger = logger @@ -1519,7 +1520,7 @@ def initialize_async( self, variables: dict[str, str], working_directory: str | None, - transport: TransportWrapper, + transport: TransportWrapper[Json], init_callback: InitCallback ) -> None: self.transport = transport diff --git a/plugin/core/transports.py b/plugin/core/transports.py index f50fa61b7..8451030dc 100644 --- a/plugin/core/transports.py +++ b/plugin/core/transports.py @@ -4,7 +4,7 @@ from contextlib import closing from functools import partial from queue import Queue -from typing import Any, Callable, Generic, IO, Protocol, Sequence, TypeVar +from typing import Any, Callable, Dict, Generic, IO, Protocol, Sequence, TypeVar import http.client import http import io @@ -24,6 +24,7 @@ TCP_CONNECT_TIMEOUT = 5 # seconds T = TypeVar("T") T_contra = TypeVar("T_contra", contravariant=True) +Json = Dict[str, Any] def _set_inheritable(inherit_file_descriptors: Sequence[int] | None, value: bool) -> None: @@ -240,7 +241,7 @@ def _write_loop(self) -> None: self._end(exception) -def encode_json(data: dict[str, Any]) -> bytes: +def encode_json(data: Json) -> bytes: return json.dumps( data, ensure_ascii=False, @@ -250,7 +251,7 @@ def encode_json(data: dict[str, Any]) -> bytes: ).encode("utf-8") -def decode_json(message: bytes) -> dict[str, Any]: +def decode_json(message: bytes) -> Json: return json.loads(message.decode("utf-8")) @@ -388,8 +389,8 @@ def start( env: dict[str, str | list[str]] | None, cwd: str | None, variables: dict[str, str], - callbacks: TransportCallbacks[dict[str, Any]], - ) -> TransportWrapper[dict[str, Any]]: + callbacks: TransportCallbacks[Json], + ) -> TransportWrapper[Json]: raise NotImplementedError() @@ -411,7 +412,7 @@ def start( env: dict[str, str | list[str]] | None, cwd: str | None, variables: dict[str, str], - callbacks: TransportCallbacks[dict[str, Any]], + callbacks: TransportCallbacks[Json], ) -> TransportWrapper: if not command: raise RuntimeError('missing "command" to start a child process for running the language server') @@ -466,7 +467,7 @@ def start( env: dict[str, str | list[str]] | None, cwd: str | None, variables: dict[str, str], - callbacks: TransportCallbacks[dict[str, Any]], + callbacks: TransportCallbacks[Json], ) -> TransportWrapper: port = _add_and_resolve_port_variable(variables, self._port) if command: @@ -508,7 +509,7 @@ def start( env: dict[str, str | list[str]] | None, cwd: str | None, variables: dict[str, str], - callbacks: TransportCallbacks[dict[str, Any]], + callbacks: TransportCallbacks[Json], ) -> TransportWrapper: if not command: raise RuntimeError('missing "command" to start a child process for running the language server') @@ -615,7 +616,7 @@ def start( env: dict[str, str | list[str]] | None, cwd: str | None, variables: dict[str, str], - callbacks: TransportCallbacks[dict[str, Any]], + callbacks: TransportCallbacks[Json], ) -> TransportWrapper: if not command: raise RuntimeError('missing "command" to start a child process for running the language server') From d2a72491896eeb7a30604929f5a83f9747c9141b Mon Sep 17 00:00:00 2001 From: Raoul Wols Date: Mon, 1 Jul 2024 20:40:42 +0200 Subject: [PATCH 10/10] Fixup test --- tests/test_protocol.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_protocol.py b/tests/test_protocol.py index 9c4558395..29f7171f5 100644 --- a/tests/test_protocol.py +++ b/tests/test_protocol.py @@ -1,6 +1,6 @@ from __future__ import annotations from LSP.plugin.core.protocol import Point, Position, Range, Request, Notification -from LSP.plugin.core.transports import encode_payload, decode_payload +from LSP.plugin.core.transports import encode_json, decode_json import unittest @@ -22,9 +22,9 @@ def test_lsp_conversion(self) -> None: class EncodingTests(unittest.TestCase): def test_encode(self) -> None: - encoded = encode_payload({"text": "😃"}) + encoded = encode_json({"text": "😃"}) self.assertEqual(encoded, b'{"text":"\xF0\x9F\x98\x83"}') - decoded = decode_payload(encoded) + decoded = decode_json(encoded) self.assertEqual(decoded, {"text": "😃"})