Skip to content

forcely add UV_HANDLE_READABLE on pipe_t #342

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions tests/test_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def connection_lost(self, exc):

class MyWritePipeProto(asyncio.BaseProtocol):
done = None
paused = False

def __init__(self, loop=None):
self.state = 'INITIAL'
Expand All @@ -61,6 +62,12 @@ def connection_lost(self, exc):
if self.done:
self.done.set_result(None)

def pause_writing(self):
self.paused = True

def resume_writing(self):
self.paused = False


class _BasePipeTest:
def test_read_pipe(self):
Expand Down Expand Up @@ -241,6 +248,29 @@ def reader(data):
self.loop.run_until_complete(proto.done)
self.assertEqual('CLOSED', proto.state)

def test_write_buffer_full(self):
rpipe, wpipe = os.pipe()
pipeobj = io.open(wpipe, 'wb', 1024)

proto = MyWritePipeProto(loop=self.loop)
connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
transport, p = self.loop.run_until_complete(connect)
self.assertIs(p, proto)
self.assertIs(transport, proto.transport)
self.assertEqual('CONNECTED', proto.state)

for i in range(32):
transport.write(b'x' * 32768)
if proto.paused:
transport.write(b'x' * 32768)
break
else:
self.fail("Didn't reach a full buffer")

os.close(rpipe)
self.loop.run_until_complete(asyncio.wait_for(proto.done, 1))
self.assertEqual('CLOSED', proto.state)


class Test_UV_Pipes(_BasePipeTest, tb.UVTestCase):
pass
Expand Down
49 changes: 49 additions & 0 deletions tests/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,55 @@ async def test_subprocess():

loop.run_until_complete(test_subprocess())

def test_write_huge_stdin_8192(self):
self._test_write_huge_stdin(8192)

def test_write_huge_stdin_8193(self):
self._test_write_huge_stdin(8193)

def test_write_huge_stdin_219263(self):
self._test_write_huge_stdin(219263)

def test_write_huge_stdin_219264(self):
self._test_write_huge_stdin(219264)

def _test_write_huge_stdin(self, buf_size):
code = '''
import sys
n = 0
while True:
line = sys.stdin.readline()
if not line:
print("unexpected EOF", file=sys.stderr)
break
if line == "END\\n":
break
n+=1
print(n)'''
num_lines = buf_size - len(b"END\n")
args = [sys.executable, b'-W', b'ignore', b'-c', code]

async def test():
proc = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE)
data = b"\n" * num_lines + b"END\n"
self.assertEqual(len(data), buf_size)
proc.stdin.write(data)
await asyncio.wait_for(proc.stdin.drain(), timeout=1.0)
try:
await asyncio.wait_for(proc.wait(), timeout=1.0)
except asyncio.TimeoutError:
proc.kill()
proc.stdin.close()
await proc.wait()
raise
out = await proc.stdout.read()
self.assertEqual(int(out), num_lines)

self.loop.run_until_complete(test())


class Test_UV_Process(_TestProcess, tb.UVTestCase):

Expand Down
4 changes: 0 additions & 4 deletions uvloop/handles/pipe.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ cdef class ReadUnixTransport(UVStream):

cdef class WriteUnixTransport(UVStream):

cdef:
uv.uv_poll_t disconnect_listener
bint disconnect_listener_inited

@staticmethod
cdef WriteUnixTransport new(Loop loop, object protocol, Server server,
object waiter)
66 changes: 3 additions & 63 deletions uvloop/handles/pipe.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ cdef __pipe_init_uv_handle(UVStream handle, Loop loop):
err = uv.uv_pipe_init(handle._loop.uvloop,
<uv.uv_pipe_t*>handle._handle,
0)
# UV_HANDLE_READABLE allows calling uv_read_start() on this pipe
# even if it is O_WRONLY, see also #317, libuv/libuv#2058
handle._handle.flags |= uv.UV_INTERNAL_HANDLE_READABLE
if err < 0:
handle._abort_init()
raise convert_error(err)
Expand Down Expand Up @@ -147,10 +150,6 @@ cdef class ReadUnixTransport(UVStream):
@cython.no_gc_clear
cdef class WriteUnixTransport(UVStream):

def __cinit__(self):
self.disconnect_listener_inited = False
self.disconnect_listener.data = NULL

@staticmethod
cdef WriteUnixTransport new(Loop loop, object protocol, Server server,
object waiter):
Expand All @@ -167,46 +166,6 @@ cdef class WriteUnixTransport(UVStream):
__pipe_init_uv_handle(<UVStream>handle, loop)
return handle

cdef _start_reading(self):
# A custom implementation for monitoring for EOF:
# libuv since v1.23.1 prohibits using uv_read_start on
# write-only FDs, so we use a throw-away uv_poll_t handle
# for that purpose, as suggested in
# https://github.com/libuv/libuv/issues/2058.

cdef int err

if not self.disconnect_listener_inited:
err = uv.uv_poll_init(self._loop.uvloop,
&self.disconnect_listener,
self._fileno())
if err < 0:
raise convert_error(err)
self.disconnect_listener.data = <void*>self
self.disconnect_listener_inited = True

err = uv.uv_poll_start(&self.disconnect_listener,
uv.UV_READABLE | uv.UV_DISCONNECT,
__on_write_pipe_poll_event)
if err < 0:
raise convert_error(err)

cdef _stop_reading(self):
cdef int err
if not self.disconnect_listener_inited:
return
err = uv.uv_poll_stop(&self.disconnect_listener)
if err < 0:
raise convert_error(err)

cdef _close(self):
if self.disconnect_listener_inited:
self.disconnect_listener.data = NULL
uv.uv_close(<uv.uv_handle_t *>(&self.disconnect_listener), NULL)
self.disconnect_listener_inited = False

UVStream._close(self)

cdef _new_socket(self):
return __pipe_get_socket(<UVSocketHandle>self)

Expand All @@ -220,25 +179,6 @@ cdef class WriteUnixTransport(UVStream):
raise NotImplementedError


cdef void __on_write_pipe_poll_event(uv.uv_poll_t* handle,
int status, int events) with gil:
cdef WriteUnixTransport tr

if handle.data is NULL:
return

tr = <WriteUnixTransport>handle.data
if tr._closed:
return

if events & uv.UV_DISCONNECT:
try:
tr._stop_reading()
tr._on_eof()
except BaseException as ex:
tr._fatal_error(ex, False)


cdef class _PipeConnectRequest(UVRequest):
cdef:
UnixTransport transport
Expand Down
10 changes: 10 additions & 0 deletions uvloop/includes/uv.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ from posix.types cimport gid_t, uid_t

from . cimport system

# This is an internal enum UV_HANDLE_READABLE from uv-common.h, used only by
# handles/pipe.pyx to temporarily workaround a libuv issue libuv/libuv#2058,
# before there is a proper fix in libuv. In short, libuv disallowed feeding a
# write-only pipe to uv_read_start(), which was needed by uvloop to detect a
# broken pipe without having to send anything on the write-only end. We're
# setting UV_HANDLE_READABLE on pipe_t to workaround this limitation
# temporarily, please see also #317.
cdef enum:
UV_INTERNAL_HANDLE_READABLE = 0x00004000

cdef extern from "uv.h" nogil:
cdef int UV_TCP_IPV6ONLY
Expand Down Expand Up @@ -82,6 +91,7 @@ cdef extern from "uv.h" nogil:
ctypedef struct uv_handle_t:
void* data
uv_loop_t* loop
unsigned int flags
# ...

ctypedef struct uv_idle_t:
Expand Down