Skip to content

Implement overlapped I/O and timeouts on server side Windows IPC #6148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jan 8, 2019
Merged
6 changes: 0 additions & 6 deletions docs/source/mypy_daemon.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,6 @@ you'll find errors sooner.
and it can only process one type checking request at a time. You can
run multiple mypy daemon processes to type check multiple repositories.

.. note::

On Windows, due to platform limitations, the mypy daemon does not currently
support a timeout for the server process. The client will still time out if
a connection to the server cannot be made, but the server will wait forever
for a new client connection.

Basic usage
***********
Expand Down
99 changes: 71 additions & 28 deletions mypy/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,36 +47,66 @@ class IPCBase:

connection = None # type: _IPCHandle

def __init__(self, name: str) -> None:
def __init__(self, name: str, timeout: Optional[float]) -> None:
self.READ_SIZE = 100000
self.name = name
self.timeout = timeout

def read(self) -> bytes:
"""Read bytes from an IPC connection until its empty."""
bdata = bytearray()
while True:
if sys.platform == 'win32':
more, _ = _winapi.ReadFile(self.connection, self.READ_SIZE)
else:
if sys.platform == 'win32':
while True:
ov, err = _winapi.ReadFile(self.connection, self.READ_SIZE, overlapped=True)
# TODO: remove once typeshed supports Literal types
assert isinstance(ov, _winapi.Overlapped)
assert isinstance(err, int)
try:
if err != 0:
assert err == _winapi.ERROR_IO_PENDING
timeout = int(self.timeout * 1000) if self.timeout else _winapi.INFINITE
res = _winapi.WaitForSingleObject(ov.event, timeout)
assert res == _winapi.WAIT_OBJECT_0
except BaseException:
ov.cancel()
raise
_, err = ov.GetOverlappedResult(True)
more = ov.getbuffer()
if more:
bdata.extend(more)
if err == 0:
# we are done!
break
elif err == _winapi.ERROR_OPERATION_ABORTED:
raise IPCException("ReadFile operation aborted.")
else:
while True:
more = self.connection.recv(self.READ_SIZE)
if not more:
break
bdata.extend(more)
if not more:
break
bdata.extend(more)
return bytes(bdata)

def write(self, data: bytes) -> None:
"""Write bytes to an IPC connection."""
if sys.platform == 'win32':
try:
# Only send data if there is data to send, to avoid it
# being confused with the empty message sent to terminate
# the connection. (We will still send the end-of-message
# empty message below, which will cause read to return.)
if data:
_winapi.WriteFile(self.connection, data)
# this empty write is to copy the behavior of socket.sendall,
# which also sends an empty message to signify it is done writing
_winapi.WriteFile(self.connection, b'')
ov, err = _winapi.WriteFile(self.connection, data, overlapped=True)
# TODO: remove once typeshed supports Literal types
assert isinstance(ov, _winapi.Overlapped)
assert isinstance(err, int)
try:
if err != 0:
assert err == _winapi.ERROR_IO_PENDING
timeout = int(self.timeout * 1000) if self.timeout else _winapi.INFINITE
res = _winapi.WaitForSingleObject(ov.event, timeout)
assert res == _winapi.WAIT_OBJECT_0
except BaseException:
ov.cancel()
raise
bytes_written, err = ov.GetOverlappedResult(True)
assert err == 0
assert bytes_written == len(data)
except WindowsError as e:
raise IPCException("Failed to write with error: {}".format(e.winerror))
else:
Expand All @@ -95,9 +125,9 @@ class IPCClient(IPCBase):
"""The client side of an IPC connection."""

def __init__(self, name: str, timeout: Optional[float]) -> None:
super().__init__(name)
super().__init__(name, timeout)
if sys.platform == 'win32':
timeout = int(timeout * 1000) if timeout else 0xFFFFFFFF # NMPWAIT_WAIT_FOREVER
timeout = int(self.timeout * 1000) if self.timeout else _winapi.NMPWAIT_WAIT_FOREVER
try:
_winapi.WaitNamedPipe(self.name, timeout)
except FileNotFoundError:
Expand All @@ -114,7 +144,7 @@ def __init__(self, name: str, timeout: Optional[float]) -> None:
0,
_winapi.NULL,
_winapi.OPEN_EXISTING,
0,
_winapi.FILE_FLAG_OVERLAPPED,
_winapi.NULL,
)
except WindowsError as e:
Expand Down Expand Up @@ -147,25 +177,26 @@ class IPCServer(IPCBase):

BUFFER_SIZE = 2**16

def __init__(self, name: str, timeout: Optional[int] = None) -> None:
def __init__(self, name: str, timeout: Optional[float] = None) -> None:
if sys.platform == 'win32':
name = r'\\.\pipe\{}-{}.pipe'.format(
name, base64.urlsafe_b64encode(os.urandom(6)).decode())
else:
name = '{}.sock'.format(name)
super().__init__(name)
super().__init__(name, timeout)
if sys.platform == 'win32':
self.connection = _winapi.CreateNamedPipe(self.name,
_winapi.PIPE_ACCESS_DUPLEX
| _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
| _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
| _winapi.FILE_FLAG_OVERLAPPED,
_winapi.PIPE_READMODE_MESSAGE
| _winapi.PIPE_TYPE_MESSAGE
| _winapi.PIPE_WAIT
| 0x8, # PIPE_REJECT_REMOTE_CLIENTS
1, # one instance
self.BUFFER_SIZE,
self.BUFFER_SIZE,
1000, # Default timeout in milis
_winapi.NMPWAIT_WAIT_FOREVER,
0, # Use default security descriptor
)
if self.connection == -1: # INVALID_HANDLE_VALUE
Expand All @@ -185,12 +216,24 @@ def __enter__(self) -> 'IPCServer':
# NOTE: It is theoretically possible that this will hang forever if the
# client never connects, though this can be "solved" by killing the server
try:
_winapi.ConnectNamedPipe(self.connection, _winapi.NULL)
ov = _winapi.ConnectNamedPipe(self.connection, overlapped=True)
# TODO: remove once typeshed supports Literal types
assert isinstance(ov, _winapi.Overlapped)
except WindowsError as e:
if e.winerror == _winapi.ERROR_PIPE_CONNECTED:
pass # The client already exists, which is fine.
else:
# Don't raise if the client already exists, or the client already connected
if e.winerror not in (_winapi.ERROR_PIPE_CONNECTED, _winapi.ERROR_NO_DATA):
raise
else:
try:
timeout = int(self.timeout * 1000) if self.timeout else _winapi.INFINITE
res = _winapi.WaitForSingleObject(ov.event, timeout)
assert res == _winapi.WAIT_OBJECT_0
except BaseException:
ov.cancel()
_winapi.CloseHandle(self.connection)
raise
_, err = ov.GetOverlappedResult(True)
assert err == 0
else:
try:
self.connection, _ = self.sock.accept()
Expand Down
8 changes: 8 additions & 0 deletions test-data/unit/daemon.test
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ import bar
[file bar.py]
pass

[case testDaemonTimeout]
$ dmypy start --timeout 1 -- --follow-imports=error
Daemon started
$ {python} -c "import time;time.sleep(1)"
$ dmypy status
No status file found
== Return code: 2

[case testDaemonRunNoTarget]
$ dmypy run -- --follow-imports=error
Daemon started
Expand Down