-
-
Notifications
You must be signed in to change notification settings - Fork 720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add C APIs for transition_*
functions
#4650
Conversation
As C APIs cannot be generated for functions that use `**kwargs` and `**kwargs` are not used in these functions, just drop them.
Seeing this on CI Traceback (most recent call last):
File "/home/runner/work/distributed/distributed/distributed/core.py", line 573, in handle_stream
handler(**merge(extra, msg))
File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4828, in handle_task_finished
r: tuple = self.stimulus_task_finished(key=key, worker=worker, **msg)
File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4214, in stimulus_task_finished
r: tuple = parent._transition(key, "memory", worker=worker, **kwargs)
File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 1864, in _transition
a: tuple = func(key, *args, **kwargs)
TypeError: transition_processing_memory() got an unexpected keyword argument 'status' https://github.com/dask/distributed/pull/4650/checks?check_run_id=2224608533 |
Seeing this on CI Traceback (most recent call last):
File "/home/runner/work/distributed/distributed/distributed/core.py", line 573, in handle_stream
handler(**merge(extra, msg))
File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4828, in handle_task_finished
r: tuple = self.stimulus_task_finished(key=key, worker=worker, **msg)
File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4214, in stimulus_task_finished
r: tuple = parent._transition(key, "memory", worker=worker, **kwargs)
File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 1864, in _transition
a: tuple = func(key, *args, **kwargs)
TypeError: transition_processing_memory() got an unexpected keyword argument 'thread' https://github.com/dask/distributed/pull/4650/checks?check_run_id=2224796235 |
Seeing this on CI Traceback (most recent call last):
File "/home/runner/work/distributed/distributed/distributed/core.py", line 573, in handle_stream
handler(**merge(extra, msg))
File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4836, in handle_task_finished
r: tuple = self.stimulus_task_finished(key=key, worker=worker, **msg)
File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4222, in stimulus_task_finished
r: tuple = parent._transition(key, "memory", worker=worker, **kwargs)
File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 1872, in _transition
a: tuple = func(key, *args, **kwargs)
TypeError: transition_processing_memory() got an unexpected keyword argument 'metadata' https://github.com/dask/distributed/pull/4650/checks?check_run_id=2224960448 |
Seeing this on CI Traceback (most recent call last):
File "/home/runner/work/distributed/distributed/distributed/utils.py", line 668, in log_errors
yield
File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 3873, in add_worker
await self.handle_worker(comm=comm, worker=address)
File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4959, in handle_worker
await self.handle_stream(comm=comm, extra={"worker": worker})
File "/home/runner/work/distributed/distributed/distributed/core.py", line 573, in handle_stream
handler(**merge(extra, msg))
File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4848, in handle_task_erred
r: tuple = self.stimulus_task_erred(key=key, **msg)
File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4271, in stimulus_task_erred
**kwargs,
File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 1873, in _transition
a: tuple = func(key, *args, **kwargs)
TypeError: transition_processing_erred() got an unexpected keyword argument 'worker' https://github.com/dask/distributed/pull/4650/checks?check_run_id=2225283910 |
Seeing this on CI distributed.core - ERROR - transition_processing_erred() got an unexpected keyword argument 'startstops'
Traceback (most recent call last):
File "/home/runner/work/distributed/distributed/distributed/core.py", line 573, in handle_stream
handler(**merge(extra, msg))
File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4848, in handle_task_erred
r: tuple = self.stimulus_task_erred(key=key, **msg)
File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4264, in stimulus_task_erred
r = parent._transition(
File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 1873, in _transition
a: tuple = func(key, *args, **kwargs)
TypeError: transition_processing_erred() got an unexpected keyword argument 'startstops' https://github.com/dask/distributed/pull/4650/checks?check_run_id=2225419119 |
Seeing this on CI Traceback (most recent call last):
File "/home/runner/work/distributed/distributed/distributed/core.py", line 573, in handle_stream
handler(**merge(extra, msg))
File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4854, in handle_task_erred
r: tuple = self.stimulus_task_erred(key=key, **msg)
File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 4270, in stimulus_task_erred
r = parent._transition(
File "/home/runner/work/distributed/distributed/distributed/scheduler.py", line 1873, in _transition
a: tuple = func(key, *args, **kwargs)
TypeError: transition_processing_erred() got an unexpected keyword argument 'text' https://github.com/dask/distributed/pull/4650/checks?check_run_id=2225540455 |
@mrocklin could you please weigh in on how we should handle these extra arguments? |
In some of these cases I suspect that we do actually use these keyword arguments, but we use them in plugins. For example the Maybe we need to send an extras dict? |
Also, thanks to @jrbourbeau for pointing me to this. For reference @jakirkham my e-mail is no longer reliably read. If you're blocked on me specifically then you're probably better served by trying to connect out-of-band. My apologies for the lack of responsiveness. |
Or, maybe easier would be to just include them as explicit keywords even if they aren't used in these particular methods. |
I'm enthusiastic about this change by the way. I look forward to seeing if it has an impact. |
If these are meant for plugins called in |
Oh yeah, maybe. That makes sense. Presumably the SchedulerState class is intended to be stripped down anyway. That sounds like a great solution. |
Yeah I think that is next thing is making a clean break between Scheduler and SchedulerState (instead of Scheduler inheriting from SchedulerState) |
To be clear, my understanding is that that change would not be necessary in order to implement the changes here though. That's more future-looking. Is that correct? |
Yeah that's future looking and unrelated to this |
@jakirkham chiming in to say I'd be interested in helping in this area (if there's room for it!). Would be happy to have a quick chat about status of/plans for the effort if you think it would be worthwhile |
Communication is one area we could improve that would help. Shared some thoughts in this comment ( #4513 (comment) ) |
distributed/scheduler.py
Outdated
plugin.transition( | ||
key, | ||
start, | ||
finish2, | ||
*args, | ||
status=status, | ||
thread=thread, | ||
metadata=metadata, | ||
worker=worker, | ||
startstops=startstops, | ||
text=text, | ||
**kwargs, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is a dealbreaker but that could be perceived as a breaking change, couldn't it? At least this is a subtle change in the required signature. So far we document this as *args, **kwargs
, see https://distributed.dask.org/en/latest/plugins.html#distributed.diagnostics.plugin.SchedulerPlugin.transition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. These are already passed as keyword arguments and we are continuing to pass them that way
That said, Idk if this is how things will shake out yet. Things are still changing here. IOW I'm not sure this is worth reviewing yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to what John said. I think that this is a no-op if you consider the extraction of these from kwargs above.
This reverts commit fbdf587.
This seems to be used by plugins even though it is not used in this transition. So just catch the argument anyways.
Needed by some Scheduler plugins.
Think the other issues are fixed. Though started seeing |
The transition `func` that was being grabbed depended on whether `key` was found in the recommendations from the last transition. If `key` was found, one function based on `v` was called. If not, `finish` was used in place of `v`. To make this a bit more explicit, simple, and efficient, assign `finish` to `v` if `key` is not found. This way `v` can be used in all cases with one call to retrieve the `func`.
c978bc0
to
b75dc23
Compare
b75dc23
to
7ae22ad
Compare
I see errors connecting to the transitioning code in the ubu-py3.7 tests
https://github.com/dask/distributed/pull/4650/checks?check_run_id=2340272649 |
Yeah I pushed some more changes last night after fixing some errors. So there may be some new ones |
distributed/scheduler.py
Outdated
if finish is "released": # noqa: F632 | ||
return self.transition_erred_released(*args, **kwargs) | ||
elif finish is "forgotten": # noqa: F632 | ||
return self.transition_released_forgotten(*args, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From @mrocklin's comment ( 8ff10d9#r49507747 ).
Does this change result in a meaningful speedup? This seems significantly more complex to me. I agree that it's worth doing, but only if it has an impact. If possible I think that we should try to measure the impact of this PR before merging. Thoughts?
(pasted here as the comment wasn't in the PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the goal of this change is to try and cutdown on time spent calling the transition_*
functions themselves by moving those calls to C. To benefit from adding C APIs to those functions we need to call them directly. Doing it indirectly will just go through Python so won't have any benefit. IOW something like this will be needed
We could flatten the if
s by comparing a single str
(maybe like "{start} {finish}"
?) to make this a bit simpler/more readable. May play with this a bit. Am open to other suggestions to improve readability/maintainability here
It's worth noting that today this lives in a dict
that is roughly as long as this code block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should add currently Cython is passing *args
and **kwargs
through the Python API atm. So there are probably additional tweaks needed to get Cython to move those function calls to C
The comparisons are already pointer comparisons so that part is being handled efficiently
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I understand why, from first principles, this is a good idea. I'm curious what impact is has on benchmarks and performance.
If this improves benchmarks by a percent or two then awesome, let's good for it. If it's only shaving off a couple of nanoseconds then maybe we shouldn't do this.
I think that right now our development flow is to make some changes, merge them in, and then see how benchmarks respond. I'm suggesting that we may want to roll those benchmarks in earlier in the process so that we make changes, look at how they change benchmarks/profiling, and then based on those improvements we decide to merge.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think that is fine. Was just trying to provide more context as well as make sure we are comfortable with this kind of change before diving deeper. Sound like we are ok with it in principle, but would like to make sure it is delivering value, which makes sense to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jakirkham I've noticed you active on github again. Welcome back from vacation? I hope that you had a relaxing time.
I'm curious to learn how much impact this PR had. I suspect that this is already on your queue of work coming back (which I imagine is long) but I thought I'd throw a 👍 on it anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I had another idea on how to improve things here, but as guessed other things are at the forefront of the queue atm 😅
While this function is not defined in Python, it is defined in Cython. Even though flake8 complains about it not being defined, flake8 doesn't know anything. So just disable this flake8 error.
Avoids needing to `intern` 2 `str`s using one instead. Also performs only 1 `str` comparison before making a function call instead of 2.
A `str` literal with `-` does not automatically get `intern`ed in Python. Though `str`s with `_` do. So replace `-` with `_` to leverage `intern`ing of literal `str`s.
Regarding the In [1]: s1 = "processing"
In [2]: s2 = "processing"
In [3]: s1 is s2
Out[3]: True However it seems In [1]: s1 = "no-worker"
In [2]: s2 = "no-worker"
In [3]: s1 is s2
Out[3]: False This is a non-issue when using In [1]: s1 = "no_worker"
In [2]: s2 = "no_worker"
In [3]: s1 is s2
Out[3]: True So replaced |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forgive some naive questions here.
From a new perspective, I think this is largely good. I wonder if we can prove that the string concatenation is actually faster.
@@ -1975,7 +1966,50 @@ def new_task(self, key: str, spec: object, state: str) -> TaskState: | |||
# State Transitions # | |||
##################### | |||
|
|||
def _transition(self, key, finish: str, *args, **kwargs): | |||
def _transition_dispatch(self, start: str, finish: str, *args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@optimize.use_switch(True)
is default, I believe, but maybe worth declaring explicitly.
@@ -1975,7 +1966,50 @@ def new_task(self, key: str, spec: object, state: str) -> TaskState: | |||
# State Transitions # | |||
##################### | |||
|
|||
def _transition(self, key, finish: str, *args, **kwargs): | |||
def _transition_dispatch(self, start: str, finish: str, *args, **kwargs): | |||
start_finish: str = sys_intern(f"{start}_{finish}".replace("-", "_")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be interested in a speed test of this approach versus hash
/dict lookup or actual strcmp
(which is what I assume cython does when both operands are char*
). For the latter, you don't need the string manipulation to make the compound, and the if statements can be grouped.
b_recs: dict | ||
b_cmsgs: dict | ||
b_wmsgs: dict | ||
b: tuple = func(key) | ||
b: tuple = self._transition_dispatch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need b
here, can combine with next line.
@@ -2070,7 +2102,7 @@ def _transition(self, key, finish: str, *args, **kwargs): | |||
|
|||
start = "released" | |||
else: | |||
raise RuntimeError("Impossible transition from %r to %r" % start_finish) | |||
raise RuntimeError(f"Impossible transition from {start} to {finish}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would change the logic a little, but would make sense to me to have _transition_dispatch
raise this error if none of the cases match.
|
||
def _transition( | ||
self, | ||
key, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't key
have a type?
@@ -2213,7 +2256,9 @@ def transition_released_waiting(self, key): | |||
pdb.set_trace() | |||
raise | |||
|
|||
def transition_no_worker_waiting(self, key): | |||
@ccall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why all ccall
rather than cfunc
- can they be called from anywhere but _transition
?
|
||
if self._validate: | ||
assert worker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good move
To cutdown on the overhead of calling
transition_*
functions, add C APIs for these functions to allow Cython to call these with C.black distributed
/flake8 distributed