-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merge frames in deserialize_bytes
#3639
Changes from all commits
33f197f
33594d3
59afbde
9ab510f
f688b21
198f81f
3b14b99
eafbf77
63a2086
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,12 +9,13 @@ | |
import msgpack | ||
|
||
from . import pickle | ||
from ..utils import has_keyword, typename | ||
from ..utils import has_keyword, nbytes, typename | ||
from .compression import maybe_compress, decompress | ||
from .utils import ( | ||
unpack_frames, | ||
pack_frames_prelude, | ||
frame_split_size, | ||
merge_frames, | ||
ensure_bytes, | ||
msgpack_opts, | ||
) | ||
|
@@ -473,6 +474,8 @@ def replace_inner(x): | |
|
||
def serialize_bytelist(x, **kwargs): | ||
header, frames = serialize(x, **kwargs) | ||
if "lengths" not in header: | ||
header["lengths"] = tuple(map(nbytes, frames)) | ||
frames = sum(map(frame_split_size, frames), []) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that the frames are already split here. This is due to constraints caused by compression (which happens below). |
||
if frames: | ||
compression, frames = zip(*map(maybe_compress, frames)) | ||
|
@@ -499,6 +502,7 @@ def deserialize_bytes(b): | |
else: | ||
header = {} | ||
frames = decompress(header, frames) | ||
frames = merge_frames(header, frames) | ||
return deserialize(header, frames) | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -931,7 +931,9 @@ def ensure_bytes(s): | |
>>> ensure_bytes(b'123') | ||
b'123' | ||
""" | ||
if hasattr(s, "encode"): | ||
if isinstance(s, bytes): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't believe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm...maybe I'm misremembering. In any event we seem to have similar code in Dask. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, well never mind - this version shouldn't hurt. I wonder why not just call the dask version? I suppose this one can work on memoryviews and bytearrays too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I was thinking about that as well, but didn't want to go down a rabbit hole here. Am ok pulling this out into a separate PR so we can explore orthogonally. WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm saying your version is fine :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW updating the Dask implementation to contain the code from Distributed in PR ( dask/dask#9050 ). Then we should be able to switch to the Dask implementation in Distributed. Edit: Switched over to using the Dask implementation in Distributed with PR ( #6295 ). |
||
return s | ||
elif hasattr(s, "encode"): | ||
return s.encode() | ||
else: | ||
try: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We (you :) ) recently added lengths to CUDA object headers. When is lengths not in header ? Is this something we should be requiring ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah still wrapping my head around this. My understanding is the header from the object has already gone through msgpack at this point so is actually a frame as well. So it may be we always need to set the
lengths
here.