Skip to content

Commit

Permalink
Allow stealing of fast tasks in some situations
Browse files Browse the repository at this point in the history
Previously we avoided the stealing of fast tasks in some situations.  It
was generally considered a bad idea to spend non-trivial time in order
to move around a 1ms task.  However, sometimes it's not this task that
matters, but the tasks that it unblocks.  Being strict about not
stealing may not always be ideal.

This commit relaxes stealing of fast tasks in three ways:

1.  It sets a minimum duration of tasks to 5ms,
    this being something like the overhead in a real system

2.  It changes the network latency variable in stealing.py from 100ms to 10ms

3.  It no longer completely rules out very fast tasks from stealing, but
    instead lets them compete based on their compute/transfer ratio
    (which should ordinarily be terrible)

    In cases where transfer times are trivial this becomes doable again.

Tests don't pass yet, this is up for comments
  • Loading branch information
mrocklin committed Apr 12, 2022
1 parent bd3f47e commit c9613fe
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 4 deletions.
2 changes: 1 addition & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3441,7 +3441,7 @@ def get_task_duration(self, ts: TaskState) -> double:
"""
duration: double = ts._prefix._duration_average
if duration >= 0:
return duration
return max(0.005, duration)

s: set = self._unknown_durations.get(ts._prefix._name) # type: ignore
if s is None:
Expand Down
5 changes: 2 additions & 3 deletions distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
# submission which may include code serialization. Therefore, be very
# conservative in the latency estimation to suppress too aggressive stealing
# of small tasks
LATENCY = 0.1
LATENCY = 0.01

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -199,8 +199,7 @@ def steal_time_ratio(self, ts):

ws = ts.processing_on
compute_time = ws.processing[ts]
if compute_time < 0.005: # 5ms, just give up
return None, None
compute_time = max(compute_time, 0.010)

nbytes = ts.get_nbytes_deps()
transfer_time = nbytes / self.scheduler.bandwidth + LATENCY
Expand Down

0 comments on commit c9613fe

Please sign in to comment.