Skip to content

Commit

Permalink
Read smaller frames to workaround OpenSSL bug
Browse files Browse the repository at this point in the history
As older versions of OpenSSL (in particular 1.0.2) have limitations on
the size of buffers they can work with, take small views into our larger
buffer and read those in instead. This should keep the buffer sizes more
manageable for OpenSSL.
  • Loading branch information
jakirkham committed Jul 24, 2021
1 parent 9c30f38 commit 54a734b
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions distributed/comm/tcp.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import ctypes
import errno
import functools
import logging
Expand All @@ -19,6 +20,8 @@
from tornado.tcpclient import TCPClient
from tornado.tcpserver import TCPServer

from tlz import sliding_window

import dask
from dask.utils import parse_timedelta

Expand All @@ -35,6 +38,7 @@


MAX_BUFFER_SIZE = MEMORY_LIMIT / 2
C_MAX_INT = 256 ** ctypes.sizeof(ctypes.c_int) // 2 - 1


def set_tcp_timeout(comm):
Expand Down Expand Up @@ -193,9 +197,13 @@ async def read(self, deserializers=None):
frames_nbytes = await stream.read_bytes(fmt_size)
(frames_nbytes,) = struct.unpack(fmt, frames_nbytes)

frames = bytearray(frames_nbytes)
n = await stream.read_into(frames)
assert n == frames_nbytes, (n, frames_nbytes)
frames = memoryview(bytearray(frames_nbytes))
# Workaround for OpenSSL 1.0.2 (can drop with OpenSSL 1.1.1)
for i, j in sliding_window(range(0, frames_nbytes + C_MAX_INT, C_MAX_INT)):
chunk = frames[i:j]
chunk_nbytes = len(chunk)
n = await stream.read_into(chunk)
assert n == chunk_nbytes, (n, chunk_nbytes)
except StreamClosedError as e:
self.stream = None
self._closed = True
Expand Down

0 comments on commit 54a734b

Please sign in to comment.