Skip to content

Commit 7d2516a

Browse files
authored
Ensure reconnecting workers do not loose required data (#5436)
1 parent 6a0217e commit 7d2516a

File tree

4 files changed

+17
-16
lines changed

4 files changed

+17
-16
lines changed

distributed/scheduler.py

+13-10
Original file line numberDiff line numberDiff line change
@@ -2737,8 +2737,7 @@ def transition_processing_memory(
27372737

27382738
if ws != ts._processing_on: # someone else has this task
27392739
logger.info(
2740-
"Unexpected worker completed task, likely due to "
2741-
"work stealing. Expected: %s, Got: %s, Key: %s",
2740+
"Unexpected worker completed task. Expected: %s, Got: %s, Key: %s",
27422741
ts._processing_on,
27432742
ws,
27442743
key,
@@ -2835,7 +2834,7 @@ def transition_memory_released(self, key, safe: bint = False):
28352834
worker_msg = {
28362835
"op": "free-keys",
28372836
"keys": [key],
2838-
"reason": f"Memory->Released {key}",
2837+
"stimulus_id": f"memory-released-{time()}",
28392838
}
28402839
for ws in ts._who_has:
28412840
worker_msgs[ws._address] = [worker_msg]
@@ -2935,7 +2934,11 @@ def transition_erred_released(self, key):
29352934
if dts._state == "erred":
29362935
recommendations[dts._key] = "waiting"
29372936

2938-
w_msg = {"op": "free-keys", "keys": [key], "reason": "Erred->Released"}
2937+
w_msg = {
2938+
"op": "free-keys",
2939+
"keys": [key],
2940+
"stimulus_id": f"erred-released-{time()}",
2941+
}
29392942
for ws_addr in ts._erred_on:
29402943
worker_msgs[ws_addr] = [w_msg]
29412944
ts._erred_on.clear()
@@ -3013,7 +3016,7 @@ def transition_processing_released(self, key):
30133016
{
30143017
"op": "free-keys",
30153018
"keys": [key],
3016-
"reason": f"processing-released-{time()}",
3019+
"stimulus_id": f"processing-released-{time()}",
30173020
}
30183021
]
30193022

@@ -4339,9 +4342,9 @@ async def add_worker(
43394342
worker_msgs[address] = []
43404343
worker_msgs[address].append(
43414344
{
4342-
"op": "free-keys",
4345+
"op": "remove-replicas",
43434346
"keys": already_released_keys,
4344-
"reason": f"reconnect-already-released-{time()}",
4347+
"stimulus_id": f"reconnect-already-released-{time()}",
43454348
}
43464349
)
43474350
for ts in list(parent._unrunnable):
@@ -4767,7 +4770,7 @@ def stimulus_task_finished(self, key=None, worker=None, **kwargs):
47674770
{
47684771
"op": "free-keys",
47694772
"keys": [key],
4770-
"reason": f"already-released-or-forgotten-{time()}",
4773+
"stimulus_id": f"already-released-or-forgotten-{time()}",
47714774
}
47724775
]
47734776
elif ts._state == "memory":
@@ -5965,7 +5968,7 @@ async def delete_worker_data(
59655968
await retry_operation(
59665969
self.rpc(addr=worker_address).free_keys,
59675970
keys=list(keys),
5968-
reason="rebalance/replicate",
5971+
stimulus_id=f"delete-data-{time()}",
59695972
)
59705973
except OSError as e:
59715974
# This can happen e.g. if the worker is going through controlled shutdown;
@@ -7846,7 +7849,7 @@ def _propagate_forgotten(
78467849
{
78477850
"op": "free-keys",
78487851
"keys": [key],
7849-
"reason": f"propagate-forgotten {ts.key}",
7852+
"stimulus_id": f"propagate-forgotten-{time()}",
78507853
}
78517854
]
78527855
state.remove_all_replicas(ts)

distributed/tests/test_failed_workers.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ def sink(*args):
488488
# artificially, without notifying the scheduler.
489489
# This can only succeed if B handles the missing data properly by
490490
# removing A from the known sources of keys
491-
a.handle_free_keys(keys=["f1"], reason="Am I evil?") # Yes, I am!
491+
a.handle_free_keys(keys=["f1"], stimulus_id="Am I evil?") # Yes, I am!
492492
result_fut = c.submit(sink, futures, workers=x.address)
493493

494494
await result_fut

distributed/tests/test_worker.py

-2
Original file line numberDiff line numberDiff line change
@@ -2444,7 +2444,6 @@ async def test_hold_on_to_replicas(c, s, *workers):
24442444
await asyncio.sleep(0.01)
24452445

24462446

2447-
@pytest.mark.flaky(reruns=10, reruns_delay=5)
24482447
@gen_cluster(client=True)
24492448
async def test_worker_reconnects_mid_compute(c, s, a, b):
24502449
"""Ensure that, if a worker disconnects while computing a result, the scheduler will
@@ -2513,7 +2512,6 @@ def fast_on_a(lock):
25132512
await asyncio.sleep(0.001)
25142513

25152514

2516-
@pytest.mark.flaky(reruns=10, reruns_delay=5)
25172515
@gen_cluster(client=True)
25182516
async def test_worker_reconnects_mid_compute_multiple_states_on_scheduler(c, s, a, b):
25192517
"""

distributed/worker.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -1596,7 +1596,7 @@ def update_data(
15961596
self.batched_stream.send(msg)
15971597
return {"nbytes": {k: sizeof(v) for k, v in data.items()}, "status": "OK"}
15981598

1599-
def handle_free_keys(self, comm=None, keys=None, reason=None):
1599+
def handle_free_keys(self, comm=None, keys=None, stimulus_id=None):
16001600
"""
16011601
Handler to be called by the scheduler.
16021602
@@ -1607,14 +1607,14 @@ def handle_free_keys(self, comm=None, keys=None, reason=None):
16071607
still decide to hold on to the data and task since it is required by an
16081608
upstream dependency.
16091609
"""
1610-
self.log.append(("free-keys", keys, reason))
1610+
self.log.append(("free-keys", keys, stimulus_id))
16111611
recommendations = {}
16121612
for key in keys:
16131613
ts = self.tasks.get(key)
16141614
if ts:
16151615
recommendations[ts] = "released" if ts.dependents else "forgotten"
16161616

1617-
self.transitions(recommendations, stimulus_id=reason)
1617+
self.transitions(recommendations, stimulus_id=stimulus_id)
16181618

16191619
def handle_remove_replicas(self, keys, stimulus_id):
16201620
"""Stream handler notifying the worker that it might be holding unreferenced,

0 commit comments

Comments
 (0)