Skip to content

Commit

Permalink
Polish parsing of worker-saturation from config (#7255)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Nov 4, 2022
1 parent eb96593 commit 4b00be1
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 11 deletions.
9 changes: 7 additions & 2 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,13 @@ properties:
How frequently to balance worker loads
worker-saturation:
type: number
exclusiveMinimum: 0
oneOf:
- type: number
exclusiveMinimum: 0
# String "inf", not to be confused with .inf which in YAML means float
# infinity. This is necessary because there's no way to parse a float
# infinity from a DASK_* environment variable.
- enum: [inf]
description: |
Controls how many root tasks are sent to workers (like a `readahead`).
Expand Down
21 changes: 15 additions & 6 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1646,13 +1646,22 @@ def __init__(
/ 2.0
)

sat = dask.config.get("distributed.scheduler.worker-saturation")
try:
self.WORKER_SATURATION = float(sat)
except ValueError:
raise ValueError(
f"Unsupported `distributed.scheduler.worker-saturation` value {sat!r}. Must be a float."
self.WORKER_SATURATION = dask.config.get(
"distributed.scheduler.worker-saturation"
)
if self.WORKER_SATURATION == "inf":
# Special case necessary because there's no way to parse a float infinity
# from a DASK_* environment variable
self.WORKER_SATURATION = math.inf
if (
not isinstance(self.WORKER_SATURATION, (int, float))
or self.WORKER_SATURATION <= 0
):
raise ValueError( # pragma: nocover
"`distributed.scheduler.worker-saturation` must be a float > 0; got "
+ repr(self.WORKER_SATURATION)
)

self.transition_counter = 0
self._idle_transition_counter = 0
self.transition_counter_max = transition_counter_max
Expand Down
8 changes: 5 additions & 3 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,12 +505,14 @@ def func(first, second):
"saturation_config, expected_task_counts",
[
(2.5, (5, 3)),
("2.5", (5, 3)),
(2.0, (4, 2)),
(1.1, (3, 2)),
(1.0, (2, 1)),
(-1.0, (1, 1)),
(float("inf"), (6, 4))
(0.1, (1, 1)),
# This is necessary because there's no way to parse a float infinite from
# a DASK_* environment variable
("inf", (6, 4)),
(float("inf"), (6, 4)),
# ^ depends on root task assignment logic; ok if changes, just needs to add up to 10
],
)
Expand Down

0 comments on commit 4b00be1

Please sign in to comment.