Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shortcut to_frames() #4480

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions distributed/comm/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from distributed.protocol.core import dumps_msgpack
import logging
import math
import socket
import warnings

import dask
from dask.sizeof import sizeof
Expand All @@ -19,6 +21,63 @@
OFFLOAD_THRESHOLD = parse_bytes(OFFLOAD_THRESHOLD)


def is_msgpack_serializable(msg) -> bool:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach feels brittle to me. It's like we're reverse engineering our own protocol in order to determine if something is fast or not. This feels off.

cc @jcrist

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option we discussed is maybe having a class MsgPackSerializable, which the user sending the message could place their message in. We could then just check for that class and use MsgPack on it in the fast path.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to infer if a message contains objects that need to pass through the slow path of the serializers? I suspect most handlers are consistent in whether they send only simple objects or complext objects. A bool kwarg passed through the proper rpc/write methods would be much simpler and likely sufficient.

Pseudocode, not suggesting comm.write is the correct place for this, or that the kwarg name is a good one.

await comm.write({"simple": "message"}, use_serializers=False)  # skip the serializers

await comm.write(some_complex_unknown_object, use_serializers=True)  # use the serializers

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think Matt played with a flag to write and had inconsistent results. IOW it's not clear the message is MsgPack serializable by the time it reaches write. Please feel free to correct me Matt

This becomes more apparent when looking at the number of things using BatchedSend for example. I think we need to flag the message before we reach write if we don't want to handle it this way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that I ever tried this explicitly, but yeah I can imagine that it would get tricky, especially with BatchedSend because we might have a mix.

We could include the "fast": True keyvalue pair in the message itself and pop it off as it goes by. For lists we would have to iterate through the entire list. It's not my favorite solution, but it seems more explicit than what is here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option we discussed is maybe having a class MsgPackSerializable, which the user sending the message could place their message in.

The other way around might make more sense. It looks like there are around 10-15 calls to to_serialize in the codebase. Maybe we special case these calls instead of the pure MsgPack ones (which are more common and lighter weight).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, this is a hacky solution that is mainly for investigating the performance overhead of extract_serialize. @jakirkham @mrocklin can I get one of you to benchmark the changes with the Cythonized version?

If this PR makes a significant difference on a real workload, I think we should unify all serialization into a single function that pass through the data once.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@quasiben did a run and posted info in issue ( quasiben/dask-scheduler-performance#102 )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More recent results in issue ( quasiben/dask-scheduler-performance#103 ). Please ignore the previous one

Copy link
Member

@jakirkham jakirkham Feb 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall time spent in write dropped from 5.25% to 4.39%. So pretty significant actually.

Also overall time spent in read dropped from 7.95% to 6.47%, which itself is interesting. My guess is since we pass simple messages directly to MsgPack when sending them out that it makes it simpler (and thus faster) to deserialize on the receiving end.

Would also note that all messages that are not MsgPack compatible are going through this extra step. So this improvement is seen in spite of adding overhead in some cases

"""Check if a message is msgpack serializable

Parameters
----------
msg: Any
Message to check
"""
typ = type(msg)

# Only lists and dicts can contain non-msgpack-serializable objects
if typ is not list and typ is not dict:
return True

if len(msg) == 0:
return True

if typ is not list:
msg_list = (msg,)
else:
msg_list = msg

for msg in msg_list:
if type(msg) is not dict:
return False

if set(msg.keys()) == {"compression", "python", "pickle-protocol"}:
continue

op = msg.get("op", None)
if not op:
return False

if "task" in msg:
return False
if "data" in msg:
return False
if msg.get("status", "OK") != "OK":
return False
if len(msg.get("worker-plugins", ())) > 0:
return False
if msg.get("actors", ()):
return False
Comment on lines +57 to +66
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth adding a comment above explaining these? Generally they make sense, but it might be worth guiding new readers


# Operations explicit disallowed
if op in (
"actor_execute",
"actor_attribute",
"plugin-add",
"proxy",
"task-erred",
):
Comment on lines +69 to +75
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on putting this list in a global variable above. That may make easy to add to as well

return False

return True


async def to_frames(
msg, serializers=None, on_error="message", context=None, allow_offload=True
):
Expand Down Expand Up @@ -46,6 +105,22 @@ def _to_frames():
else:
msg_size = 0

if is_msgpack_serializable(msg):
# print("to_frames() SIMPLE: ", repr(msg))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guessing this is just left over from debugging

if context and "compression" in context:
compress_opts = {"compression": context["compression"]}
else:
compress_opts = {}
try:
return list(dumps_msgpack(msg, **compress_opts))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should already be a list. So we can just return it

Suggested change
return list(dumps_msgpack(msg, **compress_opts))
return dumps_msgpack(msg, **compress_opts)

except TypeError as e:
warnings.warn(
"Message thought to be msgpack serializable but isn't: %s" % e
)
else:
pass
# print("to_frames() NON-SIMPLE: ", repr(msg))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guessing this is also just left over from debugging


if allow_offload and OFFLOAD_THRESHOLD and msg_size > OFFLOAD_THRESHOLD:
return await offload(_to_frames)
else:
Expand Down