Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Dask annotations not working when connecting to an existing cluster #47

Closed
andreas-ntonas opened this issue Nov 1, 2022 · 10 comments
Closed
Assignees

Comments

@andreas-ntonas
Copy link

andreas-ntonas commented Nov 1, 2022

I have noticed that Prefect does not respect the dask resources annotations when using an existing cluster

Minimal reproducible example:

I have setup a local distributed cluster like this:

First I spin up a local scheduler:

dask scheduler

And then I start 2 workers. The 1st one has 1 thread and 1 "annotation" resource available and the other one has 3 threads and no specified special resources available:

dask worker --nthreads '1' --resources "annotation=1"  tcp://192.168.1.13:8786 --name special_worker

&

dask worker --nthreads '3'  tcp://192.168.1.13:8786 --name only_regular_tasks

Suppose we have the following flow that gets executed 6 times and submits to Dask its tasks for execution:

#!/usr/bin/env python3
import dask
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
from time import sleep

@task
def show_number(x: int):
    print(x)

@task
def say_hello():
    print("Hello!")
    sleep(4)

@flow(task_runner=DaskTaskRunner(address="tcp://192.168.1.13:8786"))
def example_flow(x):
    show_number.submit(x)
    with dask.annotate(resources={'annotation': 1}):
        say_hello.submit()

if __name__ == "__main__":
    for i in range(6):
        example_flow(i)

I would expect say_hello() tasks to execute only on the special_worker and the show_number() tasks to whichever available.

However this is not the case and I get:

On the 1st special worker:

dask worker --nthreads '1' --resources "annotation=1"  tcp://192.168.1.13:8786 --name special_worker
2022-11-02 01:40:21,707 - distributed.nanny - INFO -         Start Nanny at: 'tcp://192.168.1.13:40111'
2022-11-02 01:40:21,911 - distributed.worker - INFO -       Start worker at:   tcp://192.168.1.13:38027
2022-11-02 01:40:21,911 - distributed.worker - INFO -          Listening to:   tcp://192.168.1.13:38027
2022-11-02 01:40:21,911 - distributed.worker - INFO -           Worker name:             special_worker
2022-11-02 01:40:21,911 - distributed.worker - INFO -          dashboard at:         192.168.1.13:44211
2022-11-02 01:40:21,911 - distributed.worker - INFO - Waiting to connect to:    tcp://192.168.1.13:8786
2022-11-02 01:40:21,911 - distributed.worker - INFO - -------------------------------------------------
2022-11-02 01:40:21,911 - distributed.worker - INFO -               Threads:                          1
2022-11-02 01:40:21,911 - distributed.worker - INFO -                Memory:                   1.92 GiB
2022-11-02 01:40:21,911 - distributed.worker - INFO -       Local Directory: /tmp/dask-worker-space/worker-5zbq1zho
2022-11-02 01:40:21,911 - distributed.worker - INFO - -------------------------------------------------
2022-11-02 01:40:21,915 - distributed.worker - INFO -         Registered to:    tcp://192.168.1.13:8786
2022-11-02 01:40:21,915 - distributed.worker - INFO - -------------------------------------------------
2022-11-02 01:40:21,916 - distributed.core - INFO - Starting established connection
0
Hello!
Hello!
3
Hello!
Hello!

On the 2nd only_regular_tasks worker:

dask worker --nthreads '3'  tcp://192.168.1.13:8786 --name only_regular_tasks
2022-11-02 01:40:44,071 - distributed.nanny - INFO -         Start Nanny at: 'tcp://192.168.1.13:37631'
2022-11-02 01:40:44,271 - distributed.worker - INFO -       Start worker at:   tcp://192.168.1.13:41407
2022-11-02 01:40:44,271 - distributed.worker - INFO -          Listening to:   tcp://192.168.1.13:41407
2022-11-02 01:40:44,271 - distributed.worker - INFO -           Worker name:         only_regular_tasks
2022-11-02 01:40:44,271 - distributed.worker - INFO -          dashboard at:         192.168.1.13:43921
2022-11-02 01:40:44,271 - distributed.worker - INFO - Waiting to connect to:    tcp://192.168.1.13:8786
2022-11-02 01:40:44,271 - distributed.worker - INFO - -------------------------------------------------
2022-11-02 01:40:44,271 - distributed.worker - INFO -               Threads:                          3
2022-11-02 01:40:44,271 - distributed.worker - INFO -                Memory:                   5.76 GiB
2022-11-02 01:40:44,271 - distributed.worker - INFO -       Local Directory: /tmp/dask-worker-space/worker-b0x7pdsc
2022-11-02 01:40:44,271 - distributed.worker - INFO - -------------------------------------------------
2022-11-02 01:40:44,276 - distributed.worker - INFO -         Registered to:    tcp://192.168.1.13:8786
2022-11-02 01:40:44,276 - distributed.worker - INFO - -------------------------------------------------
2022-11-02 01:40:44,276 - distributed.core - INFO - Starting established connection
Hello!
1
2
Hello!
4
5

As you can see the say_hello() tasks are getting executed on the 2nd worker that does not have the appropriate --resources "annotation=1" available

Relevant versions used in a Python 3.10 env: prefect==2.6.5, prefect-dask==0.2.1, dask==2022.10.2, distributed==2022.10.2

@zanieb
Copy link
Contributor

zanieb commented Nov 2, 2022

Ah this used to work and was the intended way to use annotations. I think this may be caused by PrefectHQ/prefect#6527 — it'll require further investigation though.

@BitTheByte
Copy link

@madkinsz I also encountered this while using dask_kubernetes to create dask cluster. some tasks have their correct annotations and others don't have any. it seems like there is a problem with the context manager

@zanieb
Copy link
Contributor

zanieb commented Dec 21, 2022

We made an optimization to task submission but that means that tasks are submitted in the background now, which means that submission can actually occur outside of the context. I'm working on a fix.

@BitTheByte
Copy link

@madkinsz do you mind sharing the PR? I'd like to keep track of this issue and provide help if needed

@tibuch
Copy link

tibuch commented Jun 30, 2023

I just encountered this issue as well. I am using this config for the DaskTaskRunner:

cluster_class="dask_jobqueue.SLURMCluster",
    cluster_kwargs={
        "account": "...",
        "queue": "...",
        "cores": 5,
        "processes": 1,
        "memory": "5 GB",
        "walltime": "24:00:00",
        "worker_extra_args": [
            "--resources TEST=1",
        ],
    }

And then submit task with:

with dask.annotate(resources={"TEST": 1}):
    for i in range(10):
        task.submit(i)

This executes 5 tasks simultaneously on the same worker. As far as I can tell the limiting factor is the number of cores. Because if I set "cores": 1 only a single task is submitted at a time.

@Jrodseth
Copy link

Jrodseth commented Aug 1, 2023

Also experiencing this problem with temporary clusters on ECS. Workers can be started with resources configures as @tibuch showed with worker_extra_args in the cluster_kwargs e.g.

"worker_extra_args": [ "--resources", "MEMORY=4098"]

But using with dask.annotate(resources={"MEMORY": 4098}): doesn't prevent multiple tasks being submitted to workers.

@chrisguidry chrisguidry self-assigned this Sep 13, 2023
@chrisguidry
Copy link

Hi @andreas-ntonas, thank you for the very detailed reproduction steps.

I'm working on reproducing this issue, but in my testing with the latest prefect and prefect-dask, I'm not able to reproduce this, and I believe I'm seeing the behavior you were expecting.

On the special worker:

$ dask worker --nthreads '1' --resources "annotation=1"  tcp://10.0.0.196:8786 --name special_worker
2023-09-13 15:57:23,709 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.0.0.196:35097'
2023-09-13 15:57:23,927 - distributed.worker - INFO -       Start worker at:     tcp://10.0.0.196:34437
2023-09-13 15:57:23,927 - distributed.worker - INFO -          Listening to:     tcp://10.0.0.196:34437
2023-09-13 15:57:23,927 - distributed.worker - INFO -           Worker name:             special_worker
2023-09-13 15:57:23,927 - distributed.worker - INFO -          dashboard at:           10.0.0.196:42291
2023-09-13 15:57:23,927 - distributed.worker - INFO - Waiting to connect to:      tcp://10.0.0.196:8786
2023-09-13 15:57:23,927 - distributed.worker - INFO - -------------------------------------------------
2023-09-13 15:57:23,927 - distributed.worker - INFO -               Threads:                          1
2023-09-13 15:57:23,927 - distributed.worker - INFO -                Memory:                   3.13 GiB
2023-09-13 15:57:23,927 - distributed.worker - INFO -       Local Directory: /tmp/dask-scratch-space/worker-uwkqglq5
2023-09-13 15:57:23,927 - distributed.worker - INFO - -------------------------------------------------
2023-09-13 15:57:23,932 - distributed.worker - INFO - Starting Worker plugin shuffle
2023-09-13 15:57:23,932 - distributed.worker - INFO -         Registered to:      tcp://10.0.0.196:8786
2023-09-13 15:57:23,932 - distributed.worker - INFO - -------------------------------------------------
2023-09-13 15:57:23,933 - distributed.core - INFO - Starting established connection to tcp://10.0.0.196:8786
Hello!
16:00:12.727 | INFO    | Task run 'say_hello-0' - Finished in state Completed()
Hello!
16:00:21.908 | INFO    | Task run 'say_hello-0' - Finished in state Completed()
Hello!
16:00:30.204 | INFO    | Task run 'say_hello-0' - Finished in state Completed()
Hello!
16:00:38.761 | INFO    | Task run 'say_hello-0' - Finished in state Completed()
Hello!
16:00:47.142 | INFO    | Task run 'say_hello-0' - Finished in state Completed()
Hello!
16:00:55.426 | INFO    | Task run 'say_hello-0' - Finished in state Completed()
Hello!
16:02:41.596 | INFO    | Task run 'say_hello-0' - Finished in state Completed()
Hello!
16:02:50.030 | INFO    | Task run 'say_hello-0' - Finished in state Completed()
Hello!
16:03:03.527 | INFO    | Task run 'say_hello-0' - Finished in state Completed()
Hello!
16:03:11.621 | INFO    | Task run 'say_hello-0' - Finished in state Completed()
Hello!
16:03:21.032 | INFO    | Task run 'say_hello-0' - Finished in state Completed()
Hello!
16:03:29.293 | INFO    | Task run 'say_hello-0' - Finished in state Completed()

On the regular worker:

$ dask worker --nthreads '3' tcp://10.0.0.196:8786 --name only_regular_tasks
2023-09-13 15:57:56,149 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.0.0.196:45717'
2023-09-13 15:57:56,398 - distributed.worker - INFO -       Start worker at:     tcp://10.0.0.196:34377
2023-09-13 15:57:56,398 - distributed.worker - INFO -          Listening to:     tcp://10.0.0.196:34377
2023-09-13 15:57:56,398 - distributed.worker - INFO -           Worker name:         only_regular_tasks
2023-09-13 15:57:56,398 - distributed.worker - INFO -          dashboard at:           10.0.0.196:42241
2023-09-13 15:57:56,398 - distributed.worker - INFO - Waiting to connect to:      tcp://10.0.0.196:8786
2023-09-13 15:57:56,398 - distributed.worker - INFO - -------------------------------------------------
2023-09-13 15:57:56,398 - distributed.worker - INFO -               Threads:                          3
2023-09-13 15:57:56,398 - distributed.worker - INFO -                Memory:                   9.40 GiB
2023-09-13 15:57:56,398 - distributed.worker - INFO -       Local Directory: /tmp/dask-scratch-space/worker-sz_hhz3t
2023-09-13 15:57:56,398 - distributed.worker - INFO - -------------------------------------------------
2023-09-13 15:57:56,402 - distributed.worker - INFO - Starting Worker plugin shuffle
2023-09-13 15:57:56,403 - distributed.worker - INFO -         Registered to:      tcp://10.0.0.196:8786
2023-09-13 15:57:56,403 - distributed.worker - INFO - -------------------------------------------------
2023-09-13 15:57:56,403 - distributed.core - INFO - Starting established connection to tcp://10.0.0.196:8786
0
16:00:08.917 | INFO    | Task run 'show_number-0' - Finished in state Completed()
1
16:00:17.455 | INFO    | Task run 'show_number-0' - Finished in state Completed()
2
16:00:26.501 | INFO    | Task run 'show_number-0' - Finished in state Completed()
3
16:00:34.491 | INFO    | Task run 'show_number-0' - Finished in state Completed()
4
16:00:43.324 | INFO    | Task run 'show_number-0' - Finished in state Completed()
5
16:00:51.269 | INFO    | Task run 'show_number-0' - Finished in state Completed()
0
16:02:32.554 | INFO    | Task run 'show_number-0' - Finished in state Completed()
1
16:02:46.101 | INFO    | Task run 'show_number-0' - Finished in state Completed()
2
16:02:59.269 | INFO    | Task run 'show_number-0' - Finished in state Completed()
3
16:03:07.540 | INFO    | Task run 'show_number-0' - Finished in state Completed()
4
16:03:16.945 | INFO    | Task run 'show_number-0' - Finished in state Completed()
5
16:03:25.199 | INFO    | Task run 'show_number-0' - Finished in state Completed()

I know it's been some time since the original report, would you be willing to test this again using these versions:

  • prefect==2.13.0
  • prefect-dask==0.2.5
  • dask==2023.9.1
  • distributed==2023.9.1

And then let me know if you're still having the same trouble?

@chrisguidry
Copy link

I'm going to close this one out as fixed, as we're unable to reproduce this on our end. There were a LOT of improvements to how tasks are handled between November and today, and I believe this was fixed as a side-effect.

If anyone is still having trouble with this, please open a new issue with repro steps.

@BitTheByte
Copy link

Hey @chrisguidry

Although this is fixed, there is a race condition somewhere as I see different tasks having other task annotations, I can't really provide a reproducible example due to the nature of the problem but it occur in flows that looks like the below

with annotate(priority=1000):
    task1.map(...)

with annotate(priority=-1000):
    task2.map(...)

@chrisguidry
Copy link

@BitTheByte thanks for the feedback, can you open a fresh issue with all the details you have available (prefect* and dask* versions, the behavior you're seeing and what you expect, etc)?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants