-
-
Notifications
You must be signed in to change notification settings - Fork 727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove dumps_task
#8067
Remove dumps_task
#8067
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 20 files ± 0 20 suites ±0 11h 35m 47s ⏱️ + 39m 57s For more details on these failures and errors, see this check. Results for commit 43b261f. ± Comparison against base commit ef6d4bf. This pull request removes 12 and adds 17 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
26041ef
to
0dbbaa8
Compare
class Refcount: | ||
"Track how many instances of this class exist; logs the count at creation and deletion" | ||
|
||
count = 0 | ||
lock = dask.utils.SerializableLock() | ||
log = [] | ||
|
||
def __init__(self): | ||
with self.lock: | ||
type(self).count += 1 | ||
self.log.append(self.count) | ||
|
||
def __del__(self): | ||
with self.lock: | ||
self.log.append(self.count) | ||
type(self).count -= 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is interesting. This PR is not changing anything in terms of scheduling, ordering, etc. but this is still quite reliably failing. It seems as if Refcount is relying on explicit garbage collection. This is something I want to look into a little more since we're seeing a lot of GC warnings recently. However, for the sake of this PR I rewrote it to count keys in data instead of relying on GC. Eventually, I think both tests would make sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really a weird case and somehow connected to how this object is defined in a local context.
I looked pretty closely but I cannot find any cyclic references. In fact, I see actually fewer objects actually tracked by GC than this counter is let to believe. I know that CPython guarantees that __del__
is indeed called and only called once but I believe there are some caveats about when this is the case.
Tests look good and considering the large reduction of complexity, I suggest to move forward unless benchmarking raises a red flag (A/B currently running, manual tests hasn't shown any anomalies) |
Well, benchmarks are happy, mostly https://github.com/coiled/benchmarks/suites/14979719727/artifacts/854911855 Some rather common operations are 20-30% faster! Some tests (primarily the parquet tests) are slightly negatively impacted. I suspect this is because we're no longer caching parts of the deserialization but I haven't verified Average memory also looks good (this is interesting...) We do see a couple of jumps in peak memory usage I suspect that the memory changes are more or less an artifact of subtle timing changes but I haven't verified. The very large outlier in wall time is the test_single_future which I strongly suspect suffers from the removed cache. The absolute change is minimal but the relative one is large. |
Thinking about these results for a moment, I suspect the improved runtime is mostly from the removal of the deserialization step. distributed/distributed/worker.py Lines 2275 to 2284 in 1f8a11c
While deserializing tasks we were basically already blocking a task slot on the state machine even though the threadpool was idling. (I was recently also thinking about "oversubscribing" the state machine, i.e. state_machine.nthreads > TPE.max_workers to create some pressure and keep the TPE busy, very different ticket, of course) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @fjetter! I <3 the reduction in complexity.
assert not function and not args and not kwargs | ||
function = execute_task | ||
args = (task,) | ||
def _normalize_task(task: Any) -> T_runspec: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It feels like this should live in utils (and maybe be public?) instead of worker given that we use it on the scheduler as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't want this to be public. This is merely a translation layer between garbage outside and clean within
Eventually this should become dask/dask#9969
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I guess it being public in a private module might be preferable but that's nit-picking.
elif not isinstance(self.run_spec, SerializedTask): | ||
self.run_spec = SerializedTask(task=self.run_spec) | ||
if isinstance(self.run_spec, ToPickle): | ||
# FIXME Sometimes the protocol is not unpacking this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we create an issue for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can but this is very low prio
Co-authored-by: Hendrik Makait <hendrik.makait@gmail.com>
cache_loads: LRU[bytes, Callable[..., Any]] = LRU(maxsize=100) | ||
|
||
|
||
def loads_function(bytes_object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @madsbk - It looks like we were using this function in dask-cuda (rapidsai/dask-cuda#1219)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we use it for its caching feature but I don't think it is needed.
In versions of distributed after dask/distributed#8067 but before dask/distributed#8216, we must patch protocol.loads to include the same decompression fix.
In versions of distributed after dask/distributed#8067 but before dask/distributed#8216, we must patch protocol.loads to include the same decompression fix. Authors: - Lawrence Mitchell (https://github.com/wence-) Approvers: - Peter Andreas Entschev (https://github.com/pentschev) URL: #1247
This is a tangent to #8049
I noticed that the
dumps_task
is a surprisingly expensive operation (about 12% in #7998)It is also a rather significant complexity driver and I believe it is no longer necessary now that pickle is used on the scheduler.
This PR explores what actually relies on this behavior and how much complexity we can remove with the removal of dumps_task