-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merge frames in deserialize_bytes
#3639
Conversation
@@ -440,6 +441,8 @@ def replace_inner(x): | |||
|
|||
def serialize_bytelist(x, **kwargs): | |||
header, frames = serialize(x, **kwargs) | |||
if "lengths" not in header: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We (you :) ) recently added lengths to CUDA object headers. When is lengths not in header ? Is this something we should be requiring ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah still wrapping my head around this. My understanding is the header from the object has already gone through msgpack at this point so is actually a frame as well. So it may be we always need to set the lengths
here.
@mrocklin, do you have any thoughts on this? 🙂 |
I have no particular thoughts on this. |
Does this need a test? |
This appears to be stalled. @jakirkham @quasiben are you all still active here? Should we close this? |
Yeah still working on this. |
Another friendly ping and status request, just to keep this thread going. |
I’ve been out on PTO for the last week |
In some cases where the frames are particularly large, we may opt to split them into smaller frames. This may be due to performance reasons when transmitting data or it may be due to limitations like those of compressors used to compact frames. So include a test case that we know will get split to make sure it is handled correctly. Or at least make sure we are catching errors that would cause it to be mishandled.
daf9a5f
to
33f197f
Compare
It appears that we are splitting frames in `serialize_byteslist` so that we can compress them. However we are not merging them back together afterwards during deserialization. This can cause an exception to be raised by serializers that expected their frames to be structured in a particular way. To fix this, we make sure to call `merge_frames` after `decompress` in `deserialize_bytes`. Further we make sure to pack the `lengths` of the original `frames` in the `header` (if not already present) in `serialize_byteslist`. This should ensure deserializers get the original frame structuring back when they operate on them.
@michaelnarodovitch @gshimansky, could you please give this a try? |
Also sorry for the long delay Martin and Matt. This now has a test. First commit demonstrates this fails without the change. Second commit demonstrates the change fixes it. |
In as much as this fixes something that was shown broken, I am happy; but I can't pretend to understand the reason. |
Pulled via pip install git+https://github.com/jakirkham/distributed.git@33594d3c22dfa3fac4fdb82f7860da3492dafd49` and reran my (previously failing) usecase with that version. Works flawless now. The fix reveals the failure mode in my usecase. The dataframe, read from disk after it was spilled, ended up being split into multiple frames, which, in turn, caused |
Yes! I checked your branch |
I'd just like to repeat, @jakirkham : |
Yeah I'm just looking at this now, Martin. Will see if we have a sensible way to fast path it. Had asked folks to try this out to confirm it actually works for them (before spending more time). Sounds like it is working and therefore worth spending more time on. |
Perfect. I imagine an |
fae29d8
to
9ab510f
Compare
As using the `bytes` constructor will cause a copy (even if it is provided a `bytes` object), special case handling a `bytes` object and just return.
Have pushed a commit to short circuit where we want a single frame from merging. Also added another test case where there are two frames that both get split. Finally realized that |
@@ -931,7 +931,9 @@ def ensure_bytes(s): | |||
>>> ensure_bytes(b'123') | |||
b'123' | |||
""" | |||
if hasattr(s, "encode"): | |||
if isinstance(s, bytes): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe bytes(s)
does make a copy if s
is already bytes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm...maybe I'm misremembering. In any event we seem to have similar code in Dask.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, well never mind - this version shouldn't hurt. I wonder why not just call the dask version? I suppose this one can work on memoryviews and bytearrays too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I was thinking about that as well, but didn't want to go down a rabbit hole here. Am ok pulling this out into a separate PR so we can explore orthogonally. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm saying your version is fine :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW updating the Dask implementation to contain the code from Distributed in PR ( dask/dask#9050 ). Then we should be able to switch to the Dask implementation in Distributed.
Edit: Switched over to using the Dask implementation in Distributed with PR ( #6295 ).
distributed/protocol/utils.py
Outdated
@@ -67,6 +67,9 @@ def merge_frames(header, frames): | |||
if all(len(f) == l for f, l in zip(frames, lengths)): | |||
return frames | |||
|
|||
if len(lengths) == 1: | |||
return [b"".join(map(ensure_bytes, frames))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it not imply only one frame, so you can return ensure_bytes(frames[0])
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it doesn't. It implies there was only one frame. However we may already have split it up during serialization. Noted the line where this happens below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth, then, checking for exactly one frame, to avoid the copy during join
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure. Was trying to integrate your feedback above. Happy to revert it if you prefer 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried to respond inline, but GitHub didn't like it. Moved here: #3639 (review)
@@ -473,6 +474,8 @@ def replace_inner(x): | |||
|
|||
def serialize_bytelist(x, **kwargs): | |||
header, frames = serialize(x, **kwargs) | |||
if "lengths" not in header: | |||
header["lengths"] = tuple(map(nbytes, frames)) | |||
frames = sum(map(frame_split_size, frames), []) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that the frames are already split here. This is due to constraints caused by compression (which happens below).
distributed/protocol/utils.py
Outdated
if len(lengths) == 1: | ||
return [b"".join(map(ensure_bytes, frames))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or do you mean you would like to see something like this?
if len(lengths) == 1: | |
return [b"".join(map(ensure_bytes, frames))] | |
if len(lengths) == 1 and len(frames) == 1: | |
return frames |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, return ensure_bytes(frames[0])
(although I don't see how it could not be bytes).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have bytearray
in the TCP code path and NumPy arrays in the UCX code path. So coercing to bytes
can make sense. Used a list
to wrap it as well since that seems to be expected (based on related testing before).
if len(lengths) == 1: | |
return [b"".join(map(ensure_bytes, frames))] | |
if len(lengths) == 1 and len(frames) == 1: | |
return [ensure_bytes(frames[0])] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, agree
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That said, I guess merge_frames
is collecting some frames unaltered, some as memoryview
s, and some as bytes
. So maybe we can just pass frames
through as-is? Would potentially save us a copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know about that, depends if all callers explicitly expect bytes or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for context, here are the lines I'm looking at:
My guess is the only expectation is that frames must be bytes
-like, which they are when inputted or when coerced to bytes
or memoryviews
. So I think we are safe to pass the frames as-is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a good use case for typing - even though I'm not sure how you say "bytes-like".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(that was just musing, not suggesting a change)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. I guess it's an open issue ( python/typing#593 ).
Alright I think the latest changes address the last round of reviews. Please let me know if there's anything else 🙂 |
distributed/protocol/utils.py
Outdated
@@ -67,6 +67,9 @@ def merge_frames(header, frames): | |||
if all(len(f) == l for f, l in zip(frames, lengths)): | |||
return frames | |||
|
|||
if len(lengths) == 1 and len(frames) == 1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I may have been stupid here: is there any way that this condition can be triggered, given the assert and the previous if? If there is only one frame and one length, that length must be the same as the length of the one frame.
If this block came before the previous, then we would avoid calculating lengths again - which should be instantaneous in any case, so should we just get rid of it?
(indeed, the if all...
condition also catches if not frames
above, and would be fast to iterate over a zero-length list; and the assert is only needed after that if...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I think you were right to request this and it can come up. For frames that are not too large and for situations where only a single frame is needed, this could be quite common. For example NumPy arrays would pass through this path. So having a fast path makes sense. This saves us going through the while
, which does a fair bit of work.
Well we would hope the frame is the same size as the length specified. However that might not be true if the message was truncated or otherwise corrupted (and I think this is why the assert
s are there). So having the assert
s makes sense. If we moved this before the assert
s, I think we should add a check to ensure the frame is the expected length.
I might not be following this last bit, but I think it could make sense to move the if not frames
after the assert
s or drop it all together. No strong thoughts here. Happy to leave it as-is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see what you mean (I think). This check already acts as a fast path. Yeah we can drop our case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok pushed two commits that drop these.
This is already handled in the check above.
This is already handled by the fast path for more than zero frames.
+1 here, sorry for the back-and-forth. |
2 ** 26 * b"ab", | ||
(2 ** 25 * b"ab", 2 ** 25 * b"ab"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beware here that I've seen Python pre-compute literals like these as an unfortunate optimization, which results in large constant values in the program code. It might be wise to call a function around 2 **26
like int(2 ** 26)
even though it has no semantic effect.
My bad experience on this was a long time ago, and so maybe things have improved..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm...yeah I'm not sure. Did push some int
wrappers around these though. One of these cases was already here (so we may have already been ok).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok, probably not a problem then. Thanks again for resolving the full issue quickly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course 🙂
FWIW these changes seem fine to me |
Not at all. Thanks for the feedback! 😄 |
At least for me the Travis CI status is not being shown on GitHub (not sure why), but it did run and pass. |
It is visible and has a green check mark to me. Merging |
Great! Yeah it now shows up for me too. Not sure what was up. Thanks all! 😄 |
Fixes #3851
It appears that we are splitting frames in
serialize_byteslist
so that we can compress them. However we are not merging them back together afterwards during deserialization. This can cause an exception to be raised by serializers that expected their frames to be structured in a particular way.To fix this, we make sure to call
merge_frames
afterdecompress
indeserialize_bytes
. Further we make sure to pack thelengths
of the originalframes
in theheader
(if not already present) inserialize_byteslist
. This should ensure deserializers get the original frame structuring back when they operate on them.