diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index de6cee016d8..1b8c063f58f 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -1,3 +1,4 @@ +import ctypes import errno import functools import logging @@ -14,6 +15,7 @@ except ImportError: ssl = None +from tlz import sliding_window from tornado import netutil from tornado.iostream import StreamClosedError from tornado.tcpclient import TCPClient @@ -35,6 +37,7 @@ MAX_BUFFER_SIZE = MEMORY_LIMIT / 2 +C_MAX_INT = 256 ** ctypes.sizeof(ctypes.c_int) // 2 - 1 def set_tcp_timeout(comm): @@ -193,9 +196,15 @@ async def read(self, deserializers=None): frames_nbytes = await stream.read_bytes(fmt_size) (frames_nbytes,) = struct.unpack(fmt, frames_nbytes) - frames = bytearray(frames_nbytes) - n = await stream.read_into(frames) - assert n == frames_nbytes, (n, frames_nbytes) + frames = memoryview(bytearray(frames_nbytes)) + # Workaround for OpenSSL 1.0.2 (can drop with OpenSSL 1.1.1) + for i, j in sliding_window( + 2, range(0, frames_nbytes + C_MAX_INT, C_MAX_INT) + ): + chunk = frames[i:j] + chunk_nbytes = len(chunk) + n = await stream.read_into(chunk) + assert n == chunk_nbytes, (n, chunk_nbytes) except StreamClosedError as e: self.stream = None self._closed = True