Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store ready and constrained tasks in heapsets #6711

Merged
merged 3 commits into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion distributed/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def discard(self, value: T) -> None:
self._heap.clear()

def peek(self) -> T:
"""Get the smallest element without removing it"""
"""Return the smallest element without removing it"""
if not self._data:
raise KeyError("peek into empty set")
while True:
Expand All @@ -109,6 +109,31 @@ def pop(self) -> T:
self._data.discard(value)
return value

def peekright(self) -> T:
"""Return one of the largest elements (not necessarily the largest!) without
removing it. It's guaranteed that ``self.peekright() >= self.peek()``.
"""
if not self._data:
raise KeyError("peek into empty set")
while True:
value = self._heap[-1][2]()
if value in self._data:
return value
del self._heap[-1]

def popright(self) -> T:
"""Remove and return one of the largest elements (not necessarily the largest!)
It's guaranteed that ``self.popright() >= self.peek()``.
"""
if not self._data:
raise KeyError("pop from an empty set")
while True:
_, _, vref = self._heap.pop()
value = vref()
if value in self._data:
self._data.discard(value)
return value

def __iter__(self) -> Iterator[T]:
"""Iterate over all elements. This is a O(n) operation which returns the
elements in pseudo-random order.
Expand Down
38 changes: 38 additions & 0 deletions distributed/tests/test_collections.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import heapq
import operator
import pickle
import random
Expand Down Expand Up @@ -157,6 +158,43 @@ def __init__(self, i):
assert set(heap) == {cx}


@pytest.mark.parametrize("peek", [False, True])
def test_heapset_popright(peek):
heap = HeapSet(key=operator.attrgetter("i"))
with pytest.raises(KeyError):
heap.peekright()
with pytest.raises(KeyError):
heap.popright()

# The heap contains broken weakrefs
for i in range(200):
c = C(f"y{i}", random.random())
heap.add(c)
if random.random() > 0.7:
heap.remove(c)

c0 = heap.peek()
while len(heap) > 1:
# These two code paths determine which of the two methods deals with the
# removal of broken weakrefs
if peek:
c1 = heap.peekright()
assert c1.i >= c0.i
assert heap.popright() is c1
else:
c1 = heap.popright()
assert c1.i >= c0.i

# Test that the heap hasn't been corrupted
h2 = heap._heap[:]
heapq.heapify(h2)
assert h2 == heap._heap

assert heap.peekright() is c0
assert heap.popright() is c0
assert not heap


def test_heapset_pickle():
"""Test pickle roundtrip for a HeapSet.

Expand Down
136 changes: 123 additions & 13 deletions distributed/tests/test_resources.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import asyncio
from time import time

import pytest

Expand All @@ -12,6 +11,13 @@
from distributed import Lock, Worker
from distributed.client import wait
from distributed.utils_test import gen_cluster, inc, lock_inc, slowadd, slowinc
from distributed.worker_state_machine import (
ComputeTaskEvent,
Execute,
ExecuteSuccessEvent,
FreeKeysEvent,
TaskFinishedMsg,
)


@gen_cluster(
Expand Down Expand Up @@ -235,20 +241,124 @@ async def test_minimum_resource(c, s, a):
assert a.state.total_resources == a.state.available_resources


@gen_cluster(client=True, nthreads=[("127.0.0.1", 2, {"resources": {"A": 1}})])
async def test_prefer_constrained(c, s, a):
futures = c.map(slowinc, range(1000), delay=0.1)
constrained = c.map(inc, range(10), resources={"A": 1})
@pytest.mark.parametrize("swap", [False, True])
@pytest.mark.parametrize("p1,p2,expect_key", [(1, 0, "y"), (0, 1, "x")])
def test_constrained_vs_ready_priority_1(ws, p1, p2, expect_key, swap):
"""If there are both ready and constrained tasks, those with the highest priority
win (note: on the Worker, priorities have their sign inverted)
"""
ws.available_resources = {"R": 1}
ws.total_resources = {"R": 1}
RR = {"resource_restrictions": {"R": 1}}

start = time()
await wait(constrained)
end = time()
assert end - start < 4
assert (
len([ts for ts in s.tasks.values() if ts.state == "memory"])
<= len(constrained) + 2
ws.handle_stimulus(ComputeTaskEvent.dummy(key="clog", stimulus_id="clog"))

stimuli = [
ComputeTaskEvent.dummy("x", priority=(p1,), stimulus_id="s1"),
ComputeTaskEvent.dummy("y", priority=(p2,), **RR, stimulus_id="s2"),
]
if swap:
stimuli = stimuli[::-1] # This must be inconsequential

instructions = ws.handle_stimulus(
*stimuli,
ExecuteSuccessEvent.dummy("clog", stimulus_id="s3"),
)
assert instructions == [
TaskFinishedMsg.match(key="clog", stimulus_id="s3"),
Execute(key=expect_key, stimulus_id="s3"),
]


@pytest.mark.parametrize("swap", [False, True])
@pytest.mark.parametrize("p1,p2,expect_key", [(1, 0, "y"), (0, 1, "x")])
def test_constrained_vs_ready_priority_2(ws, p1, p2, expect_key, swap):
"""If there are both ready and constrained tasks, but not enough available
resources, priority is inconsequential - the tasks in the ready queue are picked up.
"""
ws.nthreads = 2
ws.available_resources = {"R": 1}
ws.total_resources = {"R": 1}
RR = {"resource_restrictions": {"R": 1}}

ws.handle_stimulus(
ComputeTaskEvent.dummy(key="clog1", stimulus_id="clog1"),
ComputeTaskEvent.dummy(key="clog2", **RR, stimulus_id="clog2"),
)

# Test that both priorities and order are inconsequential
stimuli = [
ComputeTaskEvent.dummy("x", priority=(p1,), stimulus_id="s1"),
ComputeTaskEvent.dummy("y", priority=(p2,), **RR, stimulus_id="s2"),
]
if swap:
stimuli = stimuli[::-1]

instructions = ws.handle_stimulus(
*stimuli,
ExecuteSuccessEvent.dummy("clog1", stimulus_id="s3"),
)
assert instructions == [
TaskFinishedMsg.match(key="clog1", stimulus_id="s3"),
Execute(key="x", stimulus_id="s3"),
]


def test_constrained_tasks_respect_priority(ws):
ws.available_resources = {"R": 1}
ws.total_resources = {"R": 1}
RR = {"resource_restrictions": {"R": 1}}

instructions = ws.handle_stimulus(
ComputeTaskEvent.dummy(key="clog", **RR, stimulus_id="clog"),
ComputeTaskEvent.dummy(key="x1", priority=(1,), **RR, stimulus_id="s1"),
ComputeTaskEvent.dummy(key="x2", priority=(2,), **RR, stimulus_id="s2"),
ComputeTaskEvent.dummy(key="x3", priority=(0,), **RR, stimulus_id="s3"),
ExecuteSuccessEvent.dummy(key="clog", stimulus_id="s4"), # start x3
ExecuteSuccessEvent.dummy(key="x3", stimulus_id="s5"), # start x1
ExecuteSuccessEvent.dummy(key="x1", stimulus_id="s6"), # start x2
)
assert instructions == [
Execute(key="clog", stimulus_id="clog"),
TaskFinishedMsg.match(key="clog", stimulus_id="s4"),
Execute(key="x3", stimulus_id="s4"),
TaskFinishedMsg.match(key="x3", stimulus_id="s5"),
Execute(key="x1", stimulus_id="s5"),
TaskFinishedMsg.match(key="x1", stimulus_id="s6"),
Execute(key="x2", stimulus_id="s6"),
]


def test_task_cancelled_and_readded_with_resources(ws):
"""See https://github.com/dask/distributed/issues/6710

A task is enqueued without resources, then cancelled by the client, then re-added
with the same key, this time with resources.
Test that resources are respected.
"""
ws.available_resources = {"R": 1}
ws.total_resources = {"R": 1}
RR = {"resource_restrictions": {"R": 1}}

ws.handle_stimulus(
ComputeTaskEvent.dummy(key="clog", **RR, stimulus_id="s1"),
ComputeTaskEvent.dummy(key="x", stimulus_id="s2"),
)
ts = ws.tasks["x"]
assert ts.state == "ready"
assert ts in ws.ready
assert ts not in ws.constrained
assert ts.resource_restrictions == {}

ws.handle_stimulus(
FreeKeysEvent(keys=["x"], stimulus_id="clog"),
ComputeTaskEvent.dummy(key="x", **RR, stimulus_id="s2"),
)
assert s.workers[a.address].processing
ts = ws.tasks["x"]
assert ts.state == "constrained"
assert ts not in ws.ready
assert ts in ws.constrained
assert ts.resource_restrictions == {"R": 1}


@pytest.mark.skip(reason="")
Expand Down
11 changes: 5 additions & 6 deletions distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -1063,12 +1063,12 @@ async def test_steal_concurrent_simple(c, s, *workers):
await asyncio.sleep(0.1)

# ready is a heap but we don't need last, just not the next
_, victim_key = w0.state.ready[-1]
victim_key = w0.state.ready.peekright().key
victim_ts = s.tasks[victim_key]

ws0 = s.workers[w0.address]
ws1 = s.workers[w1.address]
ws2 = s.workers[w2.address]
victim_ts = s.tasks[victim_key]
steal.move_task_request(victim_ts, ws0, ws1)
steal.move_task_request(victim_ts, ws0, ws2)

Expand Down Expand Up @@ -1098,8 +1098,7 @@ async def test_steal_reschedule_reset_in_flight_occupancy(c, s, *workers):
await asyncio.sleep(0.01)

# ready is a heap but we don't need last, just not the next
_, victim_key = w0.state.ready[-1]

victim_key = w0.state.ready.peekright().key
victim_ts = s.tasks[victim_key]

wsA = victim_ts.processing_on
Expand Down Expand Up @@ -1157,8 +1156,8 @@ async def test_steal_worker_dies_same_ip(c, s, w0, w1):
while not w0.active_keys:
await asyncio.sleep(0.01)

victim_key = list(w0.state.ready)[-1][1]

# ready is a heap but we don't need last, just not the next
victim_key = w0.state.ready.peekright().key
victim_ts = s.tasks[victim_key]

wsA = victim_ts.processing_on
Expand Down
2 changes: 1 addition & 1 deletion distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1877,7 +1877,7 @@ def stateof(self, key: str) -> dict[str, Any]:
return {
"executing": ts.state == "executing",
"waiting_for_data": bool(ts.waiting_for_data),
"heap": key in pluck(1, self.state.ready),
"heap": ts in self.state.ready or ts in self.state.constrained,
"data": key in self.data,
}

Expand Down
Loading