From 33f197fd1489121384663bf6f4c83848031d20f6 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 3 Jun 2020 18:25:08 -0700 Subject: [PATCH 1/9] Test byte serialization in frame splitting case In some cases where the frames are particularly large, we may opt to split them into smaller frames. This may be due to performance reasons when transmitting data or it may be due to limitations like those of compressors used to compact frames. So include a test case that we know will get split to make sure it is handled correctly. Or at least make sure we are catching errors that would cause it to be mishandled. --- distributed/protocol/tests/test_serialize.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/distributed/protocol/tests/test_serialize.py b/distributed/protocol/tests/test_serialize.py index 4cad5a3653b..2799e2c0ea9 100644 --- a/distributed/protocol/tests/test_serialize.py +++ b/distributed/protocol/tests/test_serialize.py @@ -190,9 +190,12 @@ def test_empty_loads_deep(): assert isinstance(e2[0][0][0], Empty) -def test_serialize_bytes(): - for x in [1, "abc", np.arange(5), b"ab" * int(40e6)]: - b = serialize_bytes(x) +@pytest.mark.parametrize( + "kwargs", [{}, {"serializers": ["pickle"]},], +) +def test_serialize_bytes(kwargs): + for x in [1, "abc", np.arange(5), b"ab" * int(40e6), 2 ** 26 * b"ab"]: + b = serialize_bytes(x, **kwargs) assert isinstance(b, bytes) y = deserialize_bytes(b) assert str(x) == str(y) From 33594d3c22dfa3fac4fdb82f7860da3492dafd49 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Wed, 3 Jun 2020 18:26:00 -0700 Subject: [PATCH 2/9] Merge frames in `deserialize_bytes` It appears that we are splitting frames in `serialize_byteslist` so that we can compress them. However we are not merging them back together afterwards during deserialization. This can cause an exception to be raised by serializers that expected their frames to be structured in a particular way. To fix this, we make sure to call `merge_frames` after `decompress` in `deserialize_bytes`. Further we make sure to pack the `lengths` of the original `frames` in the `header` (if not already present) in `serialize_byteslist`. This should ensure deserializers get the original frame structuring back when they operate on them. --- distributed/protocol/serialize.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/distributed/protocol/serialize.py b/distributed/protocol/serialize.py index e4fba2b7ba9..4ed27bf278b 100644 --- a/distributed/protocol/serialize.py +++ b/distributed/protocol/serialize.py @@ -9,12 +9,13 @@ import msgpack from . import pickle -from ..utils import has_keyword, typename +from ..utils import has_keyword, nbytes, typename from .compression import maybe_compress, decompress from .utils import ( unpack_frames, pack_frames_prelude, frame_split_size, + merge_frames, ensure_bytes, msgpack_opts, ) @@ -473,6 +474,8 @@ def replace_inner(x): def serialize_bytelist(x, **kwargs): header, frames = serialize(x, **kwargs) + if "lengths" not in header: + header["lengths"] = tuple(map(nbytes, frames)) frames = sum(map(frame_split_size, frames), []) if frames: compression, frames = zip(*map(maybe_compress, frames)) @@ -499,6 +502,7 @@ def deserialize_bytes(b): else: header = {} frames = decompress(header, frames) + frames = merge_frames(header, frames) return deserialize(header, frames) From 59afbde19b2a7a64f2ab0e895bf189d46dcd81d5 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 4 Jun 2020 09:13:10 -0700 Subject: [PATCH 3/9] Add fast-path when only one frame is needed --- distributed/protocol/utils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/distributed/protocol/utils.py b/distributed/protocol/utils.py index e58732b881c..277ac2d9dc9 100644 --- a/distributed/protocol/utils.py +++ b/distributed/protocol/utils.py @@ -67,6 +67,9 @@ def merge_frames(header, frames): if all(len(f) == l for f, l in zip(frames, lengths)): return frames + if len(lengths) == 1: + return [b"".join(map(ensure_bytes, frames))] + frames = frames[::-1] lengths = lengths[::-1] From 9ab510f9924a36b7f517098b4e105bec43c58fbc Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 4 Jun 2020 09:16:32 -0700 Subject: [PATCH 4/9] Test serializing a collection of `bytes` objects --- distributed/protocol/tests/test_serialize.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/distributed/protocol/tests/test_serialize.py b/distributed/protocol/tests/test_serialize.py index 2799e2c0ea9..a7343ed783d 100644 --- a/distributed/protocol/tests/test_serialize.py +++ b/distributed/protocol/tests/test_serialize.py @@ -194,7 +194,14 @@ def test_empty_loads_deep(): "kwargs", [{}, {"serializers": ["pickle"]},], ) def test_serialize_bytes(kwargs): - for x in [1, "abc", np.arange(5), b"ab" * int(40e6), 2 ** 26 * b"ab"]: + for x in [ + 1, + "abc", + np.arange(5), + b"ab" * int(40e6), + 2 ** 26 * b"ab", + (2 ** 25 * b"ab", 2 ** 25 * b"ab"), + ]: b = serialize_bytes(x, **kwargs) assert isinstance(b, bytes) y = deserialize_bytes(b) From f688b21ca7ddb1356f02872f4df19b9efdc093e5 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 4 Jun 2020 09:23:46 -0700 Subject: [PATCH 5/9] Avoid copying `bytes` As using the `bytes` constructor will cause a copy (even if it is provided a `bytes` object), special case handling a `bytes` object and just return. --- distributed/utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/distributed/utils.py b/distributed/utils.py index f43b2f7acc0..dec1b6b79d3 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -931,7 +931,9 @@ def ensure_bytes(s): >>> ensure_bytes(b'123') b'123' """ - if hasattr(s, "encode"): + if isinstance(s, bytes): + return s + elif hasattr(s, "encode"): return s.encode() else: try: From 198f81fc23020561131834e51386b9ff5b004eee Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 4 Jun 2020 14:13:17 -0700 Subject: [PATCH 6/9] Only fast-path single frame case --- distributed/protocol/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/protocol/utils.py b/distributed/protocol/utils.py index 277ac2d9dc9..b0ca95b6c0e 100644 --- a/distributed/protocol/utils.py +++ b/distributed/protocol/utils.py @@ -67,8 +67,8 @@ def merge_frames(header, frames): if all(len(f) == l for f, l in zip(frames, lengths)): return frames - if len(lengths) == 1: - return [b"".join(map(ensure_bytes, frames))] + if len(lengths) == 1 and len(frames) == 1: + return frames frames = frames[::-1] lengths = lengths[::-1] From 3b14b9985f60792840a21c885a3fc5ed2c448459 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Fri, 5 Jun 2020 10:24:50 -0700 Subject: [PATCH 7/9] Drop fast-path for single frame This is already handled in the check above. --- distributed/protocol/utils.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/distributed/protocol/utils.py b/distributed/protocol/utils.py index b0ca95b6c0e..e58732b881c 100644 --- a/distributed/protocol/utils.py +++ b/distributed/protocol/utils.py @@ -67,9 +67,6 @@ def merge_frames(header, frames): if all(len(f) == l for f, l in zip(frames, lengths)): return frames - if len(lengths) == 1 and len(frames) == 1: - return frames - frames = frames[::-1] lengths = lengths[::-1] From eafbf77216f3bda46612882cc73174d3c90168cd Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Fri, 5 Jun 2020 10:25:04 -0700 Subject: [PATCH 8/9] Drop zero frame case This is already handled by the fast path for more than zero frames. --- distributed/protocol/utils.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/distributed/protocol/utils.py b/distributed/protocol/utils.py index e58732b881c..fa020dae909 100644 --- a/distributed/protocol/utils.py +++ b/distributed/protocol/utils.py @@ -59,9 +59,6 @@ def merge_frames(header, frames): """ lengths = list(header["lengths"]) - if not frames: - return frames - assert sum(lengths) == sum(map(nbytes, frames)) if all(len(f) == l for f, l in zip(frames, lengths)): From 63a2086e7749bffeb508554b231c215780b90a45 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Fri, 5 Jun 2020 10:45:47 -0700 Subject: [PATCH 9/9] Wrap `**` with with `int` --- distributed/protocol/tests/test_serialize.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/protocol/tests/test_serialize.py b/distributed/protocol/tests/test_serialize.py index a7343ed783d..57fceaea0c9 100644 --- a/distributed/protocol/tests/test_serialize.py +++ b/distributed/protocol/tests/test_serialize.py @@ -199,8 +199,8 @@ def test_serialize_bytes(kwargs): "abc", np.arange(5), b"ab" * int(40e6), - 2 ** 26 * b"ab", - (2 ** 25 * b"ab", 2 ** 25 * b"ab"), + int(2 ** 26) * b"ab", + (int(2 ** 25) * b"ab", int(2 ** 25) * b"ab"), ]: b = serialize_bytes(x, **kwargs) assert isinstance(b, bytes)