Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

User-defined spans #7860

Closed
crusaderky opened this issue May 25, 2023 · 12 comments
Closed

User-defined spans #7860

crusaderky opened this issue May 25, 2023 · 12 comments
Assignees

Comments

@crusaderky
Copy link
Collaborator

crusaderky commented May 25, 2023

Dask users expect some sort of grouping of tasks that is more granular than the lifetime of a whole cluster but more coarse than TaskGroup. Currently, this is delivered through Computation objects, which (AFAIK) are used by third parties only (such as Coiled).

A Computation is defined as the union of all TaskGroups that were created since the cluster last became idle. Unlike TaskGroups, which are forgotten as soon as all of their tasks are forgotten, Computations are retained semi-indefinitely in a long fixed-size deque (and retain a strong reference to all the forgotten TaskGroups).

Computations fall short of their intended goal:

  • There are many edge cases where the concept of "continuous time where the cluster is doing something" falls apart and you have computations that should be intuitively obsolete but that still accrue work; this causes overlap which is hard to understand:
  • They can't support multi-tenancy: if two independent clients submit work at the same time, it will end up in the same computation
  • They can't support nesting; as a Dask user I would like to have a fairly large grouping of tasks, which breaks down into subgroupings

I propose scrapping the Computation class and replace it with a Span class.
What follows is an early design draft and an invitation to discussion.

High level design

  • A span is defined by the client using a context manager;
  • The context manager can be nested;
  • Each client automatically defines its own root span when it's initialised, setting it to its client_id
  • If a client is created by get_client() inside a task, it instead inherits the parent task's span.

Information can be retrieved from Spans by SchedulerPlugins, like it happens today for Computations. Built-in use of Spans (e.g. by the Bokeh dashboard) is out of scope for the time being.

Usage example

client = Client()

@dask.span("base_collection")
def base_collection():
    a = da.random.random(...)
    return a + 1

with dask.span("my_workflow"):
    a = base_collection()
    with dask.span("step2"):
        b = a.sum()
    c = b * 2

c.compute()

Low level design

dask.span()

The new context manager dask.span is a simple wrapper around dask.annotate.
A new annotation span is a tuple of all span tags so far. dask.span appends to the end of it on enter and removes from the end on exit.

In the above example, layers will have the following annotations:

  • random: {"span": (<client id>, "my_workflow", "base_collection")}
  • add: {"span": (<client id>, "my_workflow", "base_collection")}
  • sum: {"span": (<client id>, "my_workflow", "step2")}
  • mul: {"span": (<client id>, "my_workflow")}

In the unhappy event that optimization strips away all task annotations, either the client or the scheduler will re-add the client id. This is a temporary hack - annotations should never be stripped to begin with.

Server side

Everything about spans can be encapsulated in a module that is separate from distributed.scheduler.

This module defines a new SchedulerPlugin, SpanPlugin, and a standalone class Span.

SpanPlugin contains the structures

  • SpanPlugin.spans: dict[tuple[str, ...], Span]
  • SpanPlugin.span_search: dict[str, list[Span]] - this is a convenience facility to make search of span tags O(1)

Each Span class contains links to its direct children plus arbitrary metadata.

When update_graph() receives a snippet of source code, the SpanPlugin posts it to all leaf Spans that are attached to the tasks.

Unlike Computation objects, Spans don't link to TaskGroups. Instead, individual tasks can contribute to them through SpanPlugin.transition(). This allows recoding, e.g.:

  • current task counts by prefix and state
  • cumulative task counts by prefix
  • when the span was first submitted by the client
  • when the span was first received by the scheduler
  • when the first task of the span was sent to a worker
  • when the first task of the span started computing on a worker
  • when the last task of the span finished computing on a worker
  • when the last task of the span was forgotten by the scheduler
  • Discuss: what other notable events should be posted to the Span objects?

These pieces of information should be each separated into

  • an attribute, which records data from the span itself when it's directly mentioned by the span annotation by the tasks;
  • a property, which recursively aggregates the attribute of the span itself and the property of its children.

Fine performance metrics

On workers, execute fine performance metrics (#7665 (comment)) shall change

from {("execute", prefix, activity, unit): value}
to {("execute", span, prefix, activity, unit): value}

e.g. from {("execute", "random", "thread-cpu", "seconds"): 12.3}
to {("execute", (<client id>, "my_workflow", "base_collection"), "random", "thread-cpu", "seconds"): 12.3}

This won't substantially change the cardinality of the data, unless the workflows creates and destroys many clients for some weird reason.

When metrics are transferred from worker to scheduler through the heartbeat,

  • execute metrics are apportioned to their leaf-level scheduler.Span object
  • Nothing special happens to gather_dep and get_data metrics

A @property on the Span objects allows recursively aggregating on the fly the metrics from children and grandchildren.

Note: the heartbeat may arrive when all tasks that point to a Span have already been forgotten.

Lifecycle

Spans are modest in size and in most cases can just be retained indefinitely.
This is potentially problematic for long-lived servers which receive connections from many short-lived clients.

To solve this problem, we could define an expiration timeout for each Span, e.g. 6 hours, that starts ticking when the last task belonging to it is forgotten by the scheduler.
When a Span is forgotten, this should also clear all matching fine performance metrics from the scheduler and the workers.

Deprecation cycle

We would like to retire Computation objects.
Who uses them today? What kind of deprecation cycle do we need to adopt?

Implementation steps

(to be replaced with links to PRs / sub-issues)

  • [dask/dask] dask.span context manager
  • SpanPlugin and Span classes
  • post notable events to Span
  • clients to define a default span = (client_id, )
  • update_graph to force span=(client id, ) if span annotations have been stripped by optimization
  • propagate span context to tasks (to be used by worker clients)
  • Post execute fine performance metrics to spans
  • Deprecate Computation objects
  • Delete spans that received no activity for more than X hours

CC @fjetter @hendrikmakait @ntabris

@hendrikmakait
Copy link
Member

Thanks for writing this up, @crusaderky. I'm really looking forward to adding spans! Given that we were not very happy with the term Computation, I'd like to bike-shed the term Span at some point. I'm not sure if I love or hate it due to its conceptual proximity to ordinary spans and traces in observability.

@fjetter
Copy link
Member

fjetter commented May 25, 2023

This module defines a new SchedulerPlugin, SpanPlugin, and a standalone class Span.

I suggest to not do this as a plugin. Specifically, I would expect that we have to change the signature to update_graph for this to work, e.g.

update_graph(..., span: Span | dict, ...)

At this stage I would not include this in the plugin API (the signatures are not identical). I also expect the changes to tasks/spans/etc. to be somewhat entangled with the current TaskState creation logic such that the plugin API may be hindering.

Unlike Computation objects, Spans don't link to TaskGroups. Instead, individual tasks can contribute to them through SpanPlugin.transition(). This allows recoding, e.g.:

We haven't talked about this. I suggest to not do this in the first iteration and not do anything with transitions. At this point I cannot tell how useful all of this additional information truly is and I suggest to keep it simple at first.

Lifecycle

Again something I would defer to later. I agree that we might want to clean them up eventually but we're not doing this right now for Computations and we may not want to do this for Spans either. At this point I find it premature to deal with this.


Implementation steps

[dask/dask] dask.span context manager

might be easier to start with this in dask.distributed. So far, I don't see the value in dask/dask beyond the import statement. We may want to change things along the way and keeping it in the same repo might be easier for now.

SpanPlugin and Span classes; post notable events to them

I suggest to just start with the Span and don't think about the plugin right now. Let's keep the steps here simple, please. Introducing the span, initializing it on the client and submitting it to the scheduler will already be some work.

propagate span context to tasks (to be used by worker clients)

Unless this actually causes problems I suggest to delay this as well. This is just a small fraction of use cases and I'd rather be fast with getting the basic infrastructure setup than dealing with worker_clients.


Overall, I would strongly encourage us to take very small steps. There is still sufficient uncertainty without specifying that much so early.

@ntabris
Copy link
Contributor

ntabris commented May 25, 2023

A span is defined by the client using a context manager

I feel like a lot of the "easy way to define a higher-level intelligible unit of work" could also be useful for instrumenting library code, so I'd be inclined to have a design that allows spans to be defined by client, but also allows spans to be defined (say) inside dask code. If we want to leave that open, that probably affects some implementation decisions.

@jacobtomlinson
Copy link
Member

jacobtomlinson commented May 25, 2023

Sounds great! I can imagine a couple of use cases where I would find this useful.

  1. Tracking stages in a workload. For example if I have a suite of benchmarks it would be nice to track where each one starts/ends.
  2. Tracking smaller stages within a computation. Perhaps something like trials in an XGBoost HPO workload. I think this is what @ntabris is suggesting.

I love the idea of nesting them.

I would be keen to chat further about how we can expose this data. Could we show this on the Dask dashboard? Can we export this to Grafana? Could we include something in a performance report?

@ntabris
Copy link
Contributor

ntabris commented May 25, 2023

@jacobtomlinson yes, those are exactly use cases that we're thinking about. Another related potential use is being able to attribute the "fine metrics" (that @crusaderky has been working on) to user-intelligible units of work.

@crusaderky crusaderky self-assigned this May 26, 2023
@crusaderky
Copy link
Collaborator Author

Following further discussion:

  • Dropped the design based on SchedulerPlugin; will use instead a thin wrapper on top of TaskGroups much like Computations already do.
  • Dropped the client id as a root span in order not to have a default annotation on every task. Only workflows that actually define the span context manager will have a span annotation; all others will be logged under a hardcoded ("default", ) span. Multitenancy users will need to explicitly call span() on the client if they want to make any sense of the workload.

@crusaderky
Copy link
Collaborator Author

Following further discussion:

  • Spans now have a unique ID in addition to their mnemonic name.
  • If you open and close a span with the same name (e.g. you call a library twice in a for loop), you are effectively obtaining two separate spans.
  • The default span now closes and reopens itself automatically when there's no activity, exactly like computations do. Explicitly named spans don't do it.

implementation in #7882.

@crusaderky
Copy link
Collaborator Author

crusaderky commented Jun 7, 2023

I would like to set the definition-of-done of this issue to:

I'd like to keep these out of scope for now and open them as follow-up issues:

  • Filter Fine Performance Metrics bokeh dashboard by span
  • Advertise active spans on Prometheus
  • Propagate span context to tasks (to be used by worker clients)
  • Delete spans that received no activity for more than X hours

@ntabris Could you play with #7885 and confirm everything is as you expected for the purpose of integrating with the Coiled dashboard?

@hendrikmakait
Copy link
Member

hendrikmakait commented Jun 7, 2023

The definition-of-done seems mostly fine to me. I think I'd prefer to deprecate computations once

Propagate span context to tasks (to be used by worker clients)

has been implemented. This feels like the one additional step needed to make spans feel coherent across the board. I don't care about the deprecation/propagation being part of the DoD though.

@crusaderky
Copy link
Collaborator Author

Unless you explicitly set a span, worker clients will add tasks to the currently running default span though - there's no regression from computations.

@hendrikmakait
Copy link
Member

True, there's no regression, but to me, deprecating computations means promoting spans instead. For this, I'd prefer steps to be coherently propagated for a smooth UX. We could also promote default spans only, that'd be fine for me as well.

@crusaderky
Copy link
Collaborator Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants