Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fine performance metrics: apportion to Computations #7776

Closed
crusaderky opened this issue Apr 14, 2023 · 12 comments · Fixed by #7885
Closed

Fine performance metrics: apportion to Computations #7776

crusaderky opened this issue Apr 14, 2023 · 12 comments · Fixed by #7885
Assignees

Comments

@crusaderky
Copy link
Collaborator

crusaderky commented Apr 14, 2023

Computation objects are commonly used by third-party Scheduler plugins (e.g. Coiled Analytics) to visualize data.
They can be crudely defined as everything that happened to the cluster between two moments when it was idle.
As there should be only one active at any given time, it makes perfect sense to apportion the metrics to the Computation objects, in addition to the Scheduler's global ever-increasing counter.

@crusaderky
Copy link
Collaborator Author

Note: it's been observed that there's occasional overlap between Computation objects. This should not happen, and needs to be treated as a blocker to this ticket.

@ntabris
Copy link
Contributor

ntabris commented Apr 14, 2023

Here's an example Coiled cluster w/ overlapping computations, though it may be an odd case: https://cloud.coiled.io/clusters/194048/overview?account=dask-engineering

@crusaderky
Copy link
Collaborator Author

Here's the relevant code:

if self.total_occupancy > 1e-9 and self.computations:
# Still working on something. Assign new tasks to same computation
computation = self.computations[-1]
else:
computation = Computation()
self.computations.append(computation)

If the cluster is idle, compute() and submit() create a new Computation. Otherwise, they attach to the last one chronologically.
The easiest way to get overlapping computations is to have a task transition from memory to processing/queued/no-worker, e.g. in case of worker crash. When it does, it will retain its original Computation, while in the meantime other computations may have been started.

@fjetter @hendrikmakait how much do you trust self.total_occupancy <= 1e-9 as an indicator of "the cluster is idle"?
Can you think of other use cases where the cluster becomes idle, then there's a new brand new compute() call, and after that an old task resumes accruing activity?

@ntabris @hendrikmakait I don't think we have anything on our dashboard highlighting hard worker restarts (e.g. not graceful shutdown, not calls client.restart()) and/or recomputed tasks?

The worker crash use case is a bit of a problem. Because on one hand, adding to a computation that is no longer the latest is the correct thing to do in this case. On the other hand, though, we don't have this kind of information with the metrics coming from the heartbeat, so we can only attach to the latest computation.

@crusaderky
Copy link
Collaborator Author

crusaderky commented Apr 18, 2023

Another use case for overlapping computations:

  1. call compute() or submit(), but some tasks have unmet resource constraints
  2. wait for all tasks that can be computed to transition to memory, then submit more tasks
  3. bring online a worker that satisfies the constraints from the initial computation
>>> x = client.submit(lambda: 1, key="x", resources={"XXX": 1})
>>> y = client.submit(lambda: 1, key="y")
>>> client.cluster.scheduler.computations
deque([<Computation cb97f633-68e3-45d5-9684-3bc670b357ae: Tasks: no-worker: 1>,
       <Computation 0562da4e-71a2-4573-80dc-c9a10abedfdd: Tasks: memory: 1>],
      maxlen=100)

Again, this logic is sound but is not compatible with metrics apportion.

@crusaderky
Copy link
Collaborator Author

crusaderky commented Apr 18, 2023

Yet another way to get overlapping computations:
Tasks are associated to computations through their task group. A user could manually submit tasks that end up being detected as the same task group, but are actually submitted separately.
This shouldn't happen in dask.array or dask.dataframe.

@ntabris
Copy link
Contributor

ntabris commented Apr 18, 2023

I don't think we have anything on our dashboard highlighting hard worker restarts (e.g. not graceful shutdown, not calls client.restart()) and/or recomputed tasks?

Not really, no. What are the ways I can tell these occurred?

@crusaderky
Copy link
Collaborator Author

crusaderky commented Apr 18, 2023

I don't think we have anything on our dashboard highlighting hard worker restarts (e.g. not graceful shutdown, not calls client.restart()) and/or recomputed tasks?

Not really, no. What are the ways I can tell these occurred?

  • Worker address (port) reported to prometheus changes.
  • Temporary negative spike in number of reported cluster-wide resources.
  • Suddenly the task count goes from memory-heavy to processing/queued heavy.

None of these are nice or intuitive - we could use a dedicated prometheus counter.

@ntabris
Copy link
Contributor

ntabris commented Apr 18, 2023

Hm, okay, yeah, I don't really want to rely much on indirect signals like that. Those would be pretty hard to query on.

@crusaderky
Copy link
Collaborator Author

@fjetter @ntabris @hendrikmakait

This conversation has become very fragmented, so I'll recap everything that has been said so far here.

End goal

As an intermediate Dask user, I want to

  1. open a third-party Dask GUI (such as the Coiled dashboard)
  2. select a cluster of interest
  3. select a computation[a] of interest
  4. visualize a pie chart of how the end-to-end time of the computation has been spent by the chart

[a] Here, a computation is what a novice to intermediate dask user would intuitively intend as a monolithic group of work; which means

  1. a single synchronous call to compute(), or a call to persist() immediately followed by wait(); or
  2. a burst of submit() calls, followed by a wait(), gather(), or as_completed() to wait for all futures; or
  3. a burst of compute() and/or persist calls, which are awaited at the end all together. The most typical practical use case here is when a user calls persist() halfway through the graph definition, doesn't wait for it, continues defining the graph on top of it and finally calls compute() on the final collection.

Current state

We are currently sending from the workers to the scheduler this information through the heartbeat:

  • Fine performance metrics for execute, broken down by task prefix and activity (e.g. unspill, task-cpu, etc.)
  • Fine performance metrics for gather_dep and get_data, broken down by activity only

On the scheduler, we have Computation objects, which are aggregation of TaskGroups. At the moment, every time the scheduler receives a compute(), persist(), or submit() call from the client,

  • if the whole cluster is idle, it creates a new Computation and attaches the new TaskGroups to it;
  • otherwise, it reuses the newest Computation and appends the new TaskGroups to it.

The problem

Naively, one would think that by construction Computation objects never overlap, as there must be a moment of complete quiet on the cluster in order to change to a new one.

I believe there are at least three use cases which causes computations to overlap:

  1. a worker dies unexpectedly; the unique data that it held needs to be recomputed (Worker crash causes computations to overlap #7825)
  2. a user hand-crafts graph keys, so unrelated works ends up in the same task group (Task prefix collision causes weird behaviour in Computations #7787)
  3. almost certainly at least another use case, which I haven't identified yet, as I'm seeing task overlapping routinely in the Coiled dashboard - too frequently to be justified by the above two edge cases alone.

The phenomenon of computations overlap is an obstacle to apportioning fine performance metrics to Computation objects:

  • Metrics for execute are currently grouped by task prefix. We could easily change them to be grouped by TaskGroup, at the cost of extra verbosity. However, metrics reach the scheduler via the heartbeat, which may arrive after the TaskGroup has been forgotten. So we'd need to move them from the heartbeat to the batched comms, which would substantially increase the worker->scheduler chatter as we'd have no other option but to send them into disaggregated form.

  • Metrics for gather_dep and get_data can't even be automatically assigned to a TaskGroup due to bundling. I am aware there are some heuristics that pick an arbitrary task of the bundle but I always considered them a ugly hack so I'm not terribly happy to use them for this new feature.

  • Finally, and also most importantly, a very important piece of the metrics is the idle time. At the moment you can calculate it by hand as idle time = end-to-end elapsed time as seen by the client * number of threads - sum(execute metrics) but it would be both straightforward and very useful to bake it into the metrics themselves, so that it can be finely broken down:

The moment you have 2+ computations active at the same time, they will steal time from each other.
You can trivially apportion idle time to "whatever one computation we're currently busy with right now" but you can't apportion it to a TaskGroup.

I don't have a measure of how much idle time we currently have in coiled/benchmarks. I expect it to be frequently substantial, and heavily use case-dependent.

Observations

  • We plan to introduce a context manager (Context manager for contiguous computations #7795) which will artificially glue everything together in a single computation. It should be noted that the main motive behind it is to tag computations; the gluing effect is just a nice side-effect. I don't think we should rely on users to add this context manager to their code.

  • @fjetter suggests we should scrap the scheduler-side machinery that reuses a Computation if the cluster is not idle. Instead, we should add an ID on the client side, with a somewhat rapidly expiring timeout which is reset at every call to compute, persist, or submit. Once the timeout expires, any further call will be apportioned to a new Computation.

This design would make multitenancy scenarios more readable, at the cost of having massive amounts of "idle" time which can only be explained by looking at all the other computations going on at the same time. It would also, in my opinion, be problematic when the "multitenancy" is actually a single (human) user invoking get_client() from a seceded task - which I suspect may be a fairly common use case. We could group parent and child together by propagating the "computation id" from original client to worker_client.

@crusaderky crusaderky self-assigned this May 24, 2023
@hendrikmakait
Copy link
Member

Just throwing around a thought: Should all this detailed and analytics-focused functionality even live on the scheduler? The scheduler is both an operational component and the most prone to becoming a bottleneck in large-scale scenarios. Since what we are trying to do here is some mix of collecting and processing traces/metrics, a dedicated component might make more sense here. This could potentially help us address some of the complications we face with the current state.

@crusaderky
Copy link
Collaborator Author

@fjetter, @hendrikmakait, @ntabris and I had an extensive offline discussion about this.
The emerging consensus is that Computations are not fit for purpose - neither for fine performance metrics nor everything else - and should be scrapped and replaced.

This conversation continues on #7860

@crusaderky
Copy link
Collaborator Author

This ticket will now be closed by apportioning the fine performance metrics to Spans.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants