Skip to content

Commit

Permalink
Rollback changes to set_resources and fix fixture
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait committed Jul 6, 2022
1 parent c837a02 commit b1f2557
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 10 deletions.
3 changes: 2 additions & 1 deletion distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2449,7 +2449,8 @@ def ws_with_running_task(ws, request):
The task may or may not raise secede(); the tests using this fixture runs twice.
"""
ws.set_resources(R=1)
ws.available_resources = {"R": 1}
ws.total_resources = {"R": 1}
instructions = ws.handle_stimulus(
ComputeTaskEvent.dummy(
key="x", resource_restrictions={"R": 1}, stimulus_id="compute"
Expand Down
9 changes: 8 additions & 1 deletion distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1752,7 +1752,14 @@ def update_data(
return {"nbytes": {k: sizeof(v) for k, v in data.items()}, "status": "OK"}

async def set_resources(self, **resources: float) -> None:
self.state.set_resources(**resources)
for r, quantity in resources.items():
if r in self.state.total_resources:
self.state.available_resources[r] += (
quantity - self.state.total_resources[r]
)
else:
self.state.available_resources[r] = quantity
self.state.total_resources[r] = quantity

await retry_operation(
self.scheduler.set_resources,
Expand Down
8 changes: 0 additions & 8 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3116,14 +3116,6 @@ def validate_state(self) -> None:
for tss in self.data_needed.values():
assert len({ts.key for ts in tss}) == len(tss)

def set_resources(self, **resources: float) -> None:
for r, quantity in resources.items():
if r in self.total_resources:
self.available_resources[r] += quantity - self.total_resources[r]
else:
self.available_resources[r] = quantity
self.total_resources[r] = quantity


class BaseWorker(abc.ABC):
"""Wrapper around the :class:`WorkerState` that implements instructions handling.
Expand Down

0 comments on commit b1f2557

Please sign in to comment.