Skip to content

Commit

Permalink
Convert workers to use PipeSendChannels or FdStreams
Browse files Browse the repository at this point in the history
  • Loading branch information
richardsheridan committed Nov 24, 2020
1 parent 5d570cf commit af5a774
Showing 1 changed file with 106 additions and 8 deletions.
114 changes: 106 additions & 8 deletions trio/_worker_processes.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
import os
import struct
from collections import deque
from itertools import count
from multiprocessing import Pipe, Process, Barrier
from multiprocessing.reduction import ForkingPickler
from threading import BrokenBarrierError

from ._core import open_nursery, RunVar, CancelScope, wait_readable
from ._core import (
open_nursery,
RunVar,
CancelScope,
wait_readable,
EndOfChannel,
ClosedResourceError,
)
from ._sync import CapacityLimiter
from ._threads import to_thread_run_sync
from ._timeouts import sleep_forever
Expand All @@ -20,11 +29,14 @@
_proc_counter = count()

if os.name == "nt":
from trio._windows_pipes import PipeSendChannel, PipeReceiveChannel
from ._wait_for_object import WaitForSingleObject

# TODO: This uses a thread per-process. Can we do better?
wait_sentinel = WaitForSingleObject
else:
from ._unix_pipes import FdStream

wait_sentinel = wait_readable


Expand Down Expand Up @@ -155,16 +167,13 @@ def worker_fn():
async def run_sync(self, sync_fn, *args):
# Neither this nor the child process should be waiting at this point
assert not self._barrier.n_waiting, "Must first wake_up() the WorkerProc"
self._rehabilitate_pipes()
async with open_nursery() as nursery:
await nursery.start(self._child_monitor)
try:
await to_thread_run_sync(
self._send_pipe.send, (sync_fn, args), cancellable=True
)
result = await to_thread_run_sync(
self._recv_pipe.recv, cancellable=True
)
except EOFError:
await self._send(ForkingPickler.dumps((sync_fn, args)))
result = ForkingPickler.loads(await self._recv())
except EndOfChannel:
# Likely the worker died while we were waiting on a pipe
self.kill() # Just make sure
# sleep and let the monitor raise the appropriate error to avoid
Expand Down Expand Up @@ -208,6 +217,95 @@ def kill(self):
except AttributeError:
self._proc.terminate()

if os.name == "nt":

def _rehabilitate_pipes(self):
# These must be created in an async context, so defer so
# that this object can be instantiated in e.g. a thread
if not hasattr(self, "_send_chan"):
self._send_chan = PipeSendChannel(self._send_pipe.fileno())
self._recv_chan = PipeReceiveChannel(self._recv_pipe.fileno())

async def _recv(self):
try:
return await self._recv_chan.receive()
except ClosedResourceError as e:
if "process" not in str(e):
raise
# worker probably died but the channel iocp fired first
await sleep_forever() # wait for monitor to see it

async def _send(self, buf):
await self._send_chan.send(buf)

def __del__(self):
# Avoid __del__ errors on cleanup: GH#174, GH#1767
# multiprocessing will close them for us
if hasattr(self, "_send_chan"):
self._send_chan._handle_holder.handle = -1
self._recv_chan._handle_holder.handle = -1

else:

def _rehabilitate_pipes(self):
# These must be created in an async context, so defer so
# that this object can be instantiated in e.g. a thread
if not hasattr(self, "_send_stream"):
self._send_stream = FdStream(self._send_pipe.fileno())
self._recv_stream = FdStream(self._recv_pipe.fileno())

async def _recv(self):
buf = await self._recv_exactly(4)
(size,) = struct.unpack("!i", buf)
if size == -1:
buf = await self._recv_exactly(8)
(size,) = struct.unpack("!Q", buf)
return await self._recv_exactly(size)

async def _recv_exactly(self, size):
result_bytes = bytearray()
while size:
partial_result = await self._recv_stream.receive_some(size)
num_recvd = len(partial_result)
if not num_recvd:
raise OSError("got end of file during message")
result_bytes.extend(partial_result)
if num_recvd > size: # pragma: no cover
raise RuntimeError("Oversized response")
else:
size -= num_recvd
return result_bytes

async def _send(self, buf):
n = len(buf)
if n > 0x7FFFFFFF:
pre_header = struct.pack("!i", -1)
header = struct.pack("!Q", n)
await self._send_stream.send_all(pre_header)
await self._send_stream.send_all(header)
await self._send_stream.send_all(buf)
else:
# For wire compatibility with 3.7 and lower
header = struct.pack("!i", n)
if n > 16384:
# The payload is large so Nagle's algorithm won't be triggered
# and we'd better avoid the cost of concatenation.
await self._send_stream.send_all(header)
await self._send_stream.send_all(buf)
else:
# Issue #20540: concatenate before sending, to avoid delays due
# to Nagle's algorithm on a TCP socket.
# Also note we want to avoid sending a 0-length buffer separately,
# to avoid "broken pipe" errors if the other end closed the pipe.
await self._send_stream.send_all(header + buf)

def __del__(self):
# Avoid __del__ errors on cleanup: GH#174, GH#1767
# multiprocessing will close them for us
if hasattr(self, "_send_stream"):
self._send_stream._fd_holder.fd = -1
self._recv_stream._fd_holder.fd = -1


async def to_process_run_sync(sync_fn, *args, cancellable=False, limiter=None):
"""Run sync_fn in a separate process
Expand Down

0 comments on commit af5a774

Please sign in to comment.