From 54a734b15b80c36f43eecb6b8859c0c4835cb165 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Fri, 23 Jul 2021 19:45:57 -0700 Subject: [PATCH] Read smaller frames to workaround OpenSSL bug As older versions of OpenSSL (in particular 1.0.2) have limitations on the size of buffers they can work with, take small views into our larger buffer and read those in instead. This should keep the buffer sizes more manageable for OpenSSL. --- distributed/comm/tcp.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index de6cee016d8..3185149bee2 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -1,3 +1,4 @@ +import ctypes import errno import functools import logging @@ -19,6 +20,8 @@ from tornado.tcpclient import TCPClient from tornado.tcpserver import TCPServer +from tlz import sliding_window + import dask from dask.utils import parse_timedelta @@ -35,6 +38,7 @@ MAX_BUFFER_SIZE = MEMORY_LIMIT / 2 +C_MAX_INT = 256 ** ctypes.sizeof(ctypes.c_int) // 2 - 1 def set_tcp_timeout(comm): @@ -193,9 +197,13 @@ async def read(self, deserializers=None): frames_nbytes = await stream.read_bytes(fmt_size) (frames_nbytes,) = struct.unpack(fmt, frames_nbytes) - frames = bytearray(frames_nbytes) - n = await stream.read_into(frames) - assert n == frames_nbytes, (n, frames_nbytes) + frames = memoryview(bytearray(frames_nbytes)) + # Workaround for OpenSSL 1.0.2 (can drop with OpenSSL 1.1.1) + for i, j in sliding_window(range(0, frames_nbytes + C_MAX_INT, C_MAX_INT)): + chunk = frames[i:j] + chunk_nbytes = len(chunk) + n = await stream.read_into(chunk) + assert n == chunk_nbytes, (n, chunk_nbytes) except StreamClosedError as e: self.stream = None self._closed = True