Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store ready and constrained tasks in heapsets #6711

Merged
merged 3 commits into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 123 additions & 13 deletions distributed/tests/test_resources.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import asyncio
from time import time

import pytest

Expand All @@ -12,6 +11,13 @@
from distributed import Lock, Worker
from distributed.client import wait
from distributed.utils_test import gen_cluster, inc, lock_inc, slowadd, slowinc
from distributed.worker_state_machine import (
ComputeTaskEvent,
Execute,
ExecuteSuccessEvent,
FreeKeysEvent,
TaskFinishedMsg,
)


@gen_cluster(
Expand Down Expand Up @@ -235,20 +241,124 @@ async def test_minimum_resource(c, s, a):
assert a.state.total_resources == a.state.available_resources


@gen_cluster(client=True, nthreads=[("127.0.0.1", 2, {"resources": {"A": 1}})])
async def test_prefer_constrained(c, s, a):
futures = c.map(slowinc, range(1000), delay=0.1)
constrained = c.map(inc, range(10), resources={"A": 1})
@pytest.mark.parametrize("swap", [False, True])
@pytest.mark.parametrize("p1,p2,expect_key", [(1, 0, "y"), (0, 1, "x")])
def test_constrained_vs_ready_priority_1(ws, p1, p2, expect_key, swap):
"""If there are both ready and constrained tasks, those with the highest priority
win (note: on the Worker, priorities have their sign inverted)
"""
ws.available_resources = {"R": 1}
ws.total_resources = {"R": 1}
RR = {"resource_restrictions": {"R": 1}}

start = time()
await wait(constrained)
end = time()
assert end - start < 4
assert (
len([ts for ts in s.tasks.values() if ts.state == "memory"])
<= len(constrained) + 2
ws.handle_stimulus(ComputeTaskEvent.dummy(key="clog", stimulus_id="clog"))

stimuli = [
ComputeTaskEvent.dummy("x", priority=(p1,), stimulus_id="s1"),
ComputeTaskEvent.dummy("y", priority=(p2,), **RR, stimulus_id="s2"),
]
if swap:
stimuli = stimuli[::-1] # This must be inconsequential

instructions = ws.handle_stimulus(
*stimuli,
ExecuteSuccessEvent.dummy("clog", stimulus_id="s3"),
)
assert instructions == [
TaskFinishedMsg.match(key="clog", stimulus_id="s3"),
Execute(key=expect_key, stimulus_id="s3"),
]


@pytest.mark.parametrize("swap", [False, True])
@pytest.mark.parametrize("p1,p2,expect_key", [(1, 0, "y"), (0, 1, "x")])
def test_constrained_vs_ready_priority_2(ws, p1, p2, expect_key, swap):
"""If there are both ready and constrained tasks, but not enough available
resources, priority is inconsequential - the tasks in the ready queue are picked up.
"""
ws.nthreads = 2
ws.available_resources = {"R": 1}
ws.total_resources = {"R": 1}
RR = {"resource_restrictions": {"R": 1}}

ws.handle_stimulus(
ComputeTaskEvent.dummy(key="clog1", stimulus_id="clog1"),
ComputeTaskEvent.dummy(key="clog2", **RR, stimulus_id="clog2"),
)

# Test that both priorities and order are inconsequential
stimuli = [
ComputeTaskEvent.dummy("x", priority=(p1,), stimulus_id="s1"),
ComputeTaskEvent.dummy("y", priority=(p2,), **RR, stimulus_id="s2"),
]
if swap:
stimuli = stimuli[::-1]

instructions = ws.handle_stimulus(
*stimuli,
ExecuteSuccessEvent.dummy("clog1", stimulus_id="s3"),
)
assert instructions == [
TaskFinishedMsg.match(key="clog1", stimulus_id="s3"),
Execute(key="x", stimulus_id="s3"),
]


def test_constrained_tasks_respect_priority(ws):
ws.available_resources = {"R": 1}
ws.total_resources = {"R": 1}
RR = {"resource_restrictions": {"R": 1}}

instructions = ws.handle_stimulus(
ComputeTaskEvent.dummy(key="clog", **RR, stimulus_id="clog"),
ComputeTaskEvent.dummy(key="x1", priority=(1,), **RR, stimulus_id="s1"),
ComputeTaskEvent.dummy(key="x2", priority=(2,), **RR, stimulus_id="s2"),
ComputeTaskEvent.dummy(key="x3", priority=(0,), **RR, stimulus_id="s3"),
ExecuteSuccessEvent.dummy(key="clog", stimulus_id="s4"), # start x3
ExecuteSuccessEvent.dummy(key="x3", stimulus_id="s5"), # start x1
ExecuteSuccessEvent.dummy(key="x1", stimulus_id="s6"), # start x2
)
assert instructions == [
Execute(key="clog", stimulus_id="clog"),
TaskFinishedMsg.match(key="clog", stimulus_id="s4"),
Execute(key="x3", stimulus_id="s4"),
TaskFinishedMsg.match(key="x3", stimulus_id="s5"),
Execute(key="x1", stimulus_id="s5"),
TaskFinishedMsg.match(key="x1", stimulus_id="s6"),
Execute(key="x2", stimulus_id="s6"),
]


def test_task_cancelled_and_readded_with_resources(ws):
"""See https://github.com/dask/distributed/issues/6710

A task is enqueued without resources, then cancelled by the client, then re-added
with the same key, this time with resources.
Test that resources are respected.
"""
ws.available_resources = {"R": 1}
ws.total_resources = {"R": 1}
RR = {"resource_restrictions": {"R": 1}}

ws.handle_stimulus(
ComputeTaskEvent.dummy(key="clog", **RR, stimulus_id="s1"),
ComputeTaskEvent.dummy(key="x", stimulus_id="s2"),
)
ts = ws.tasks["x"]
assert ts.state == "ready"
assert ts in ws.ready
assert ts not in ws.constrained
assert ts.resource_restrictions == {}

ws.handle_stimulus(
FreeKeysEvent(keys=["x"], stimulus_id="clog"),
ComputeTaskEvent.dummy(key="x", **RR, stimulus_id="s2"),
)
assert s.workers[a.address].processing
ts = ws.tasks["x"]
assert ts.state == "constrained"
assert ts not in ws.ready
assert ts in ws.constrained
assert ts.resource_restrictions == {"R": 1}


@pytest.mark.skip(reason="")
Expand Down
10 changes: 4 additions & 6 deletions distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -1063,12 +1063,12 @@ async def test_steal_concurrent_simple(c, s, *workers):
await asyncio.sleep(0.1)

# ready is a heap but we don't need last, just not the next
_, victim_key = w0.state.ready[-1]
victim_key = w0.state.ready.peek().key
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment above would make me think peek isn't right for these (since it is the next). Seems like this test is relying on race conditions already (that move_task_request happens faster than slowinc); picking the first task might significantly increase the probability of those race conditions being triggered. I would think assert ws1.has_what may fail intermittently if ws0 starts executing victim_key before move_task_request lands?

In #6598 I added a HeapSet.popright method which might be useful here: https://github.com/dask/distributed/pull/6598/files#diff-8d01f1d2f0aff66e9a55d31f036edddb0c44d82c8bfcc1eefc015b9ac8d661edR100-R109. Note that this isn't a PR I'd ever expect to merge, so not sure if popright would ever have any usage beyond this test—just mentioning that it's come up once before.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thanks for spotting it!

victim_ts = s.tasks[victim_key]

ws0 = s.workers[w0.address]
ws1 = s.workers[w1.address]
ws2 = s.workers[w2.address]
victim_ts = s.tasks[victim_key]
steal.move_task_request(victim_ts, ws0, ws1)
steal.move_task_request(victim_ts, ws0, ws2)

Expand Down Expand Up @@ -1098,8 +1098,7 @@ async def test_steal_reschedule_reset_in_flight_occupancy(c, s, *workers):
await asyncio.sleep(0.01)

# ready is a heap but we don't need last, just not the next
_, victim_key = w0.state.ready[-1]

victim_key = w0.state.ready.peek().key
victim_ts = s.tasks[victim_key]

wsA = victim_ts.processing_on
Expand Down Expand Up @@ -1157,8 +1156,7 @@ async def test_steal_worker_dies_same_ip(c, s, w0, w1):
while not w0.active_keys:
await asyncio.sleep(0.01)

victim_key = list(w0.state.ready)[-1][1]

victim_key = w0.state.ready.peek().key
victim_ts = s.tasks[victim_key]

wsA = victim_ts.processing_on
Expand Down
2 changes: 1 addition & 1 deletion distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1877,7 +1877,7 @@ def stateof(self, key: str) -> dict[str, Any]:
return {
"executing": ts.state == "executing",
"waiting_for_data": bool(ts.waiting_for_data),
"heap": key in pluck(1, self.state.ready),
"heap": ts in self.state.ready or ts in self.state.constrained,
"data": key in self.data,
}

Expand Down
Loading