Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic annotations #5207

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft

Dynamic annotations #5207

wants to merge 7 commits into from

Conversation

madsbk
Copy link
Contributor

@madsbk madsbk commented Aug 12, 2021

As part of the discussion of Heterogeneous Computing Design, this PR implements dynamic annotations inspired by @mrocklin's option 4 in #4656.
The idea is that a user can specify a function that updates the annotations and restrictions of a task after its execution:

  1. The worker runs the user specified annotations functions on the task output.
  2. The worker sends the updated attributes back to the scheduler using a new "annotate-task" message.
  3. The scheduler updates the attributes of the task and all tasks depending on it.

The following in an example of an dynamic annotation function that sets the worker executor to "gpu" when the task output is a CUDA device object. Together with a default GPU executor (#5084), this will make sure that all tasks preceding the first CUDA task will use the GPU executor.

def set_gpu_executor(ts: TaskState, value: object) -> bool:
    if dask_cuda.is_device_object(value):
        ts.annotations["executor"] = "gpu"
        return True
    return False

Notice, this will not annotate the first use of a GPU task, only its dependent tasks are annotated automatically. The user can of cause still annotate tasks manually.

  • Tests added / passed
  • Passes black distributed / flake8 distributed / isort distributed

@madsbk madsbk force-pushed the dynamic_annotations branch from 5247116 to c56bc9b Compare August 12, 2021 15:17
@madsbk madsbk force-pushed the dynamic_annotations branch from c56bc9b to dae7d59 Compare August 16, 2021 08:04
worker=None,
key: str = None,
annotations: Mapping = None,
resource_restrictions: Mapping = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend making restrictions a separate route. I don't think that there is any reason for annotations and restrictions to be intertwined.

However, I do think that it would be useful to support a variety of different kinds of restrictions. You should probably roll this into the Scheduler.set_restrictions method. I think that it has a worker= keyword today as a hint that host= and resource= should come later.

Copy link
Contributor Author

@madsbk madsbk Aug 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.
A related question, what is the different between regular handlers and stream handlers?
Can I use self.batched_stream.send for a "op": "set_restrictions" message when set_restrictions is in Scheduler.handlers?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream handlers are used with BatchedSend objects. They are strictly fire and forget. They're also very cheap to send. We accumulate several small messages in a list and then send off that list periodically (with a very short period)

The handlers are what receive the await rpc.foo(...) commands. They wait for and collect a response.

If you need a response then use await rpc.foo(...). If you want to send lots of little small messages without really affecting overhead then try to use a stream handler. You can certainly put a method in both stream handlers and handlers, although if the receiving end needs a response then this might require some cleverness.

Does that provide enough information?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thanks!

key: str = None,
annotations: Mapping = None,
resource_restrictions: Mapping = None,
annotate_dependents=True,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about the motivation behind this keyword. Can you expand here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am considering if an annotator function should be able to enable/disable annotations of task dependents. Alternativevly, we can always apply annotations to task dependents.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it's possibly in scope. I'm curious if there are active use cases for it today that you have in mind. If so, I would be curious to hear them (for example, maybe you need the results of gpu tasks are likely to also be gpu tasks). If not, then I would recommend waiting on this until we know more.

My gut reaction is not to do this until we have a motivating use case.

Copy link
Contributor Author

@madsbk madsbk Aug 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example, maybe you need the results of gpu tasks are likely to also be gpu tasks

Yes, to implement automatic use of a GPU ThreadPoolExecutor (#5084). If the initial array or dataframe creation is annotated with the GPU executor, all the following operations are as well.

This will also be very helpful when mixing data types. By tracking the GPU use of each individual partition, we can utilize GPU and CPU workers simultaneously.

@madsbk madsbk force-pushed the dynamic_annotations branch from 519ce1c to 8ad9cee Compare August 19, 2021 09:32
@madsbk madsbk force-pushed the dynamic_annotations branch from 449ca4b to 7494179 Compare August 19, 2021 12:52
Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some small comments. In general this looks fine to me though.

}
res1, res2 = c.get(dsk, ["g1", "g2"], sync=False, asynchronous=True)
assert "Executor1" in await res1
assert "Executor2" in await res2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that you had to use get/compute here is intersting. There are interesting challenges with doing this with futures / dependencies that might change. I still think that this is a useful feature to have, but it seems like this approach might not be comprehensive if we want to solve things across update_graph calls. Agree or disagree?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree! I hadn't thought about this issue before today. As you say, it is still useful but I think we should put this PR on hold for a bit and see if we can come up with a better approach.


# Separate annotation updates into two cases:
a1 = {} # when the task's dependents should also be updated
a2 = {} # when only the task itself should be updated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have thoughts on better names here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh, yeah I can do that :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants