-
-
Notifications
You must be signed in to change notification settings - Fork 727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Shortcut to_frames() #4480
Shortcut to_frames() #4480
Conversation
dabaea3
to
da7a7bc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Mads! 😄
Added a few minor comments below.
else: | ||
compress_opts = {} | ||
try: | ||
return list(dumps_msgpack(msg, **compress_opts)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should already be a list
. So we can just return it
return list(dumps_msgpack(msg, **compress_opts)) | |
return dumps_msgpack(msg, **compress_opts) |
@@ -46,6 +105,22 @@ def _to_frames(): | |||
else: | |||
msg_size = 0 | |||
|
|||
if is_msgpack_serializable(msg): | |||
# print("to_frames() SIMPLE: ", repr(msg)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guessing this is just left over from debugging
) | ||
else: | ||
pass | ||
# print("to_frames() NON-SIMPLE: ", repr(msg)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guessing this is also just left over from debugging
if "task" in msg: | ||
return False | ||
if "data" in msg: | ||
return False | ||
if msg.get("status", "OK") != "OK": | ||
return False | ||
if len(msg.get("worker-plugins", ())) > 0: | ||
return False | ||
if msg.get("actors", ()): | ||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be worth adding a comment above explaining these? Generally they make sense, but it might be worth guiding new readers
if op in ( | ||
"actor_execute", | ||
"actor_attribute", | ||
"plugin-add", | ||
"proxy", | ||
"task-erred", | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts on putting this list in a global variable above. That may make easy to add to as well
One of the things that @jakirkham had mentioned recently was that instead of short-circuiting this check we might extend msgpack with custom serializers instead. Thoughts? |
Mads and I discussed this as well. We agree that is still important, but not something we want to pick up as a first pass. This seemed useful already as it gets at the heart of the problem, which is fast pathing Scheduler and Worker status messages. The custom serializers is probably a larger rework of serialization as a whole (though likely still valuable as it will cutdown overhead on all messages even with non-MsgPack friendly objects). Feel free to correct me on any of this Mads |
@@ -19,6 +21,63 @@ | |||
OFFLOAD_THRESHOLD = parse_bytes(OFFLOAD_THRESHOLD) | |||
|
|||
|
|||
def is_msgpack_serializable(msg) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach feels brittle to me. It's like we're reverse engineering our own protocol in order to determine if something is fast or not. This feels off.
cc @jcrist
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option we discussed is maybe having a class MsgPackSerializable
, which the user sending the message could place their message in. We could then just check for that class and use MsgPack on it in the fast path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to infer if a message contains objects that need to pass through the slow path of the serializers? I suspect most handlers are consistent in whether they send only simple objects or complext objects. A bool
kwarg passed through the proper rpc/write
methods would be much simpler and likely sufficient.
Pseudocode, not suggesting comm.write
is the correct place for this, or that the kwarg name is a good one.
await comm.write({"simple": "message"}, use_serializers=False) # skip the serializers
await comm.write(some_complex_unknown_object, use_serializers=True) # use the serializers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think Matt played with a flag to write
and had inconsistent results. IOW it's not clear the message is MsgPack serializable by the time it reaches write
. Please feel free to correct me Matt
This becomes more apparent when looking at the number of things using BatchedSend
for example. I think we need to flag the message before we reach write
if we don't want to handle it this way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that I ever tried this explicitly, but yeah I can imagine that it would get tricky, especially with BatchedSend because we might have a mix.
We could include the "fast": True
keyvalue pair in the message itself and pop it off as it goes by. For lists we would have to iterate through the entire list. It's not my favorite solution, but it seems more explicit than what is here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option we discussed is maybe having a class MsgPackSerializable, which the user sending the message could place their message in.
The other way around might make more sense. It looks like there are around 10-15 calls to to_serialize
in the codebase. Maybe we special case these calls instead of the pure MsgPack ones (which are more common and lighter weight).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, this is a hacky solution that is mainly for investigating the performance overhead of extract_serialize
. @jakirkham @mrocklin can I get one of you to benchmark the changes with the Cythonized version?
If this PR makes a significant difference on a real workload, I think we should unify all serialization into a single function that pass through the data once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@quasiben did a run and posted info in issue ( quasiben/dask-scheduler-performance#102 )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More recent results in issue ( quasiben/dask-scheduler-performance#103 ). Please ignore the previous one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall time spent in write
dropped from 5.25%
to 4.39%
. So pretty significant actually.
Also overall time spent in read
dropped from 7.95%
to 6.47%
, which itself is interesting. My guess is since we pass simple messages directly to MsgPack when sending them out that it makes it simpler (and thus faster) to deserialize on the receiving end.
Would also note that all messages that are not MsgPack compatible are going through this extra step. So this improvement is seen in spite of adding overhead in some cases
Mads could you please rebase this on |
da7a7bc
to
78356ce
Compare
Superseded by #4531 |
black distributed
/flake8 distributed