-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support get_worker() and get_client() in client.run calls #7912
base: main
Are you sure you want to change the base?
support get_worker() and get_client() in client.run calls #7912
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 20 files + 20 20 suites +20 11h 46m 24s ⏱️ + 11h 46m 24s For more details on these failures, see this check. Results for commit f6a4584. ± Comparison against base commit 4a1570f. ♻️ This comment has been updated with latest results. |
9495fba
to
44381cd
Compare
return "{{{}}}".format(", ".join(strs)) | ||
|
||
|
||
async def run(server, comm, function, args=(), kwargs=None, wait=True): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you are copying this code and duplicate it such that we have three different versions of this now. Is this just because of the ctx var?
If so, I could see us changing this ctx var to something that just returns the current server, s.t. there is a ctx var where one can always access the Nanny, Scheduler, Worker wherever it is called. I believe this would be useful in all sorts of cases (e.g. recently I played again with graph serialization and it would've been nice to have a way to know if one is on the scheduler or not)
I wonder if couldn't set this ctx var in Server.{handle_stream|handle_comm|start}
and get all of this effectively for free.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for a few reasons actually:
- I wanted to fix up incorrectly injecting "dask_scheduler" into client.run calls and "dask_worker" into client.run_on_scheduler calls
- run calls always logged to "distributed.worker" which means it didn't show up on the scheduler logs
- and yeah the contextvar code now makes worker.run quite a bit different from nanny.run and scheduler.run
so I think for this PR it's worth the duplication for now and we can think about cleaning up the duplication and or making a Server cvar in a future PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so I think for this PR it's worth the duplication for now and we can think about cleaning up the duplication and or making a Server cvar in a future PR
nit: Should we add an issue for this just to track it? I can see the duplication hurt us long-term if we're not careful.
… up on perf reports
a301e1d
to
b2e7210
Compare
run_corutine is now a deprecated alias for run
def convert_args_to_str(args: tuple[object, ...], max_len: int | None = None) -> str: | ||
"""Convert args to a string, allowing for some arguments to raise | ||
exceptions during conversion and ignoring them. | ||
""" | ||
length = 0 | ||
strs = ["" for i in range(len(args))] | ||
for i, arg in enumerate(args): | ||
try: | ||
sarg = repr(arg) | ||
except Exception: | ||
sarg = "< could not convert arg to str >" | ||
strs[i] = sarg | ||
length += len(sarg) + 2 | ||
if max_len is not None and length > max_len: | ||
return "({}".format(", ".join(strs[: i + 1]))[:max_len] | ||
else: | ||
return "({})".format(", ".join(strs)) | ||
|
||
|
||
def convert_kwargs_to_str(kwargs: dict, max_len: int | None = None) -> str: | ||
"""Convert kwargs to a string, allowing for some arguments to raise | ||
exceptions during conversion and ignoring them. | ||
""" | ||
length = 0 | ||
strs = ["" for i in range(len(kwargs))] | ||
for i, (argname, arg) in enumerate(kwargs.items()): | ||
try: | ||
sarg = repr(arg) | ||
except Exception: | ||
sarg = "< could not convert arg to str >" | ||
skwarg = repr(argname) + ": " + sarg | ||
strs[i] = skwarg | ||
length += len(skwarg) + 2 | ||
if max_len is not None and length > max_len: | ||
return "{{{}".format(", ".join(strs[: i + 1]))[:max_len] | ||
else: | ||
return "{{{}}}".format(", ".join(strs)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this has just been moved from worker.py
.
return error_message(e) | ||
return {"status": "OK", "result": to_serialize(result)} | ||
|
||
async def run_coroutine(self, comm, *, function, args, kwargs, wait): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it intended that we have made args
, kwargs
and wait
required now? I would've expected them to remain optional. Also, function
is now a keyword-only argument. Is it possible to maintain the signature and just introduce the deprecation instead?
], | ||
) | ||
@gen_cluster(client=True, nthreads=[]) | ||
async def test_run_exception(c, s, Worker, nanny, address_attr, log_name): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This test has become quite a handful to read. Would it make sense to split this up into smaller and more descriptive parts?
return "{{{}}}".format(", ".join(strs)) | ||
|
||
|
||
async def run(server, comm, function, args=(), kwargs=None, wait=True): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so I think for this PR it's worth the duplication for now and we can think about cleaning up the duplication and or making a Server cvar in a future PR
nit: Should we add an issue for this just to track it? I can see the duplication hurt us long-term if we're not careful.
Closes #7763
pre-commit run --all-files