-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
update_who_has can remove workers #6342
Conversation
b97a134
to
c39288a
Compare
Unit Test Results 15 files ± 0 15 suites ±0 6h 34m 9s ⏱️ +15s For more details on these failures, see this check. Results for commit e8f5ef5. ± Comparison against base commit 5feb171. ♻️ This comment has been updated with latest results. |
8c3617b
to
b67f5c2
Compare
This is ready for review and merge. |
d7934d8
to
e6fc28e
Compare
All review comments addressed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, just two small things
distributed/worker.py
Outdated
# All workers which previously held a replica of the key are either | ||
# in flight or busy. Kick off ensure_communicating to try fetching | ||
# the data from the new worker. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# All workers which previously held a replica of the key are either | |
# in flight or busy. Kick off ensure_communicating to try fetching | |
# the data from the new worker. | |
# We were holding back from fetching this key, possibly because we | |
# didn't know of any workers holding it (that weren't busy, or didn't | |
# already have in-flight data requests). | |
# In case having a new worker available changes things, | |
# kick off ensure_communicating to try fetching the data | |
# from this new worker. |
There are a few other reasons why the task might still be in fetch
- we're over the
total_out_connections
limit - we're over the
comm_threshold_bytes
limits - we're paused
Just to make it clear that we're being a little optimistic here, not necessarily testing for all cases in which ensure_communicating = True
will actually have an effect (which is fine).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated comment
distributed/worker.py
Outdated
if ensure_communicating: | ||
recs, instructions = merge_recs_instructions( | ||
(recs, instructions), | ||
self._ensure_communicating(stimulus_id=stimulus_id), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like I said in cbccc86#r73797288, it seems odd to me to call _ensure_communicating
directly. Why not just append an EnsureCommunicatingAfterTransitions
instruction on L3535?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair. We'll probably revert it soon as per our recent chat with @fjetter , but it makes sense to have an organic behaviour for now.
@fjetter are you happy to merge this? |
# Do not use handle_missing_data, since it would cause the scheduler to call | ||
# handle_free_keys(["y"]) on a | ||
s.remove_replica(ts=s.tasks["x"], ws=s.workers[b.address]) | ||
# We used a scheduler internal call, thus corrupting its state. | ||
# Don't crash at the end of the test. | ||
s.validate = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an anti pattern and we should not intentionally put the scheduler in a corrupt state.
Is the condition this test is provoking even real? I would argue that it is not even intended for the key to get into a missing state here but the scheduler should rather ensure the task is released
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's real. I rewrote the test.
# Wipe x from the worker and lose the message that would inform the scheduler | ||
a.handle_remove_replicas(keys=["x"], stimulus_id="test") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would this request ever be triggered if the key is the only one on the cluster?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scheduler thinks w1 and w2 hold a replica.
w2 is unresponsive, but the scheduler doesn't know yet as it didn't time out.
The AMM decides to ask w1 to drop its replica.
Adding a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does it mean "w2 is unresponsive"? It is dead? If so, why not kill the worker? Just being unresponsive, i.e. not hearbeating for a while, doesn't trigger any action. If anything this would trigger the removal of the worker.
Is there a way to trigger this using scheduler API? handle_remove_replica
is a signal that's always originating from the scheduler, calling it here out of the blue doesn't seem right
# Lose the message that would inform the scheduler. | ||
# In theory, this should not happen. | ||
# In practice, in case of TPC connection fault, BatchedSend can drop messages. | ||
assert a.batched_stream.buffer == [ | ||
{"key": "x", "stimulus_id": "test", "op": "release-worker-data"} | ||
] | ||
a.batched_stream.buffer.clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the connection fails, there are a couple of other mechanisms around that should recover from any lost messages. This clearly doesn't work well which is why we're likely removing the worker reconnect (#6361)
Apart from this failure scenario, we should always assume that a message goes through. Is there any other way to trigger this condition? I don't feel comfortable with clearing the buffer for a test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned above: I cannot think of how to send it out of sync in real life, short of a reconnect.
The question is: is not being able to think of anything from the top of our head a reason good enough not to have a failsafe? (not a loaded question)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The question is: is not being able to think of anything from the top of our head a reason good enough not to have a failsafe? (not a loaded question)
Well, I think there is no universally true answer to this question.
Why I care so much about this specific case is because this test apparently motivates an instruction in update_who_has
, see https://github.com/dask/distributed/pull/6342/files#r873561939
The argument, as it stands right now, is that we need this instruction because otherwise this test would deadlock. However, this test is an artificial case that intentionally corrupts state and I don't believe we should be able to recover from any kind of state corruption automatically (that's just unrealistic, I believe)
- If we cannot reach this state "naturally" we should remove the instruction and the entire code branch that leads up to it because it is effectively dead code and additional complexity.
- If this code branch is not dead and can be reached naturally, we should invest the time to construct this naturally occurring state to make sure the instruction is indeed doing what it is supposed to and doesn't have unexpected side effects.
If neither of these two options are satisfying, I would propose to instead go for
- keep the code path leading up the the MissingDataMsg but do not issue this instruction but rather fail hard. This will be uncomfortable but we'll know why and how this case is hit and can fix it reliably
# 2. b is unresponsive, but the scheduler doesn't know yet as it didn't time out. | ||
# 3. The AMM decides to ask a to drop its replica. | ||
a.handle_remove_replicas(keys=["x"], stimulus_id="test") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- AMM would not request the removal of a replica if there is only one replica available
- If a worker is "unresponsive" this would mean that some kind of worker is notifying the scheduler about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- AMM will not request the removal of a replica if it believes it's the only one replica available. Hence the unresponsive worker that hasn't reached timeout.
- unresponsive as in, it just froze but it didn't trigger any timeouts yet. When it hits timeouts later, we'll lose all of its contents.
some kind of worker is notifying the scheduler about this.
First of all, you might as well be in the condition where theworker that holds the other replica of x has been unresponsive for less than 30s, which means that nobody knows yet.
Second, this is more nuanced than you picture it.
If a heartbeat doesn't reach the scheduler for 5 minutes, the scheduler will remove the worker and will mark all of the keys that were in memory on that worker as lost.
If a gather_dep doesn't receive any data for 30 seconds, the waiting worker will remove all of the keys from has_what that it knows about and send a {op: missing-data}
for the specific keys that it was trying to fetch at the time. What will likely happen soon afterwards is that the scheduler asks it to fetch more keys from the same unresponsive worker, thus hitting another timeout, and then another and again and again, for the next 5 minutes.
This design is clearly bad. We should have a design session about it - the most obvious option (but it may be too aggressive) would be to have a {op: missing-worker}
call where, as soon as any worker times out on gather-dep, the whole worker is removed from the cluster, together with all of the keys that it has in memory, including those the reporting worker doesn't know about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a heartbeat doesn't reach the scheduler for 5 minutes, the scheduler will remove the worker and will mark all of the keys that were in memory on that worker as lost.
It will not only mark it as lost but it will try to close the worker, ungracefully
If a gather_dep doesn't receive any data for 30 seconds, the waiting worker will remove all of the keys from has_what that it knows about
Not if it doesn't receive any data but if the connection attempt fails, assuming there is no pooled connection.
I'm struggling a bit with your usage of the term unresponsive. I don't know if this means a dead worker, high latencies, a blocked GIL / event loop, etc.
The way I understand unresponsiveness is if the worker doesn't respond to messages in a given time and the only place where we expect the worker application code to respond in a certain time is during handshakes upon establishment of new RPC connections.
If a heartbeat doesn't reach the scheduler for 5 minutes, the scheduler will remove the worker and will mark all of the keys that were in memory on that worker as lost.
If a gather_dep doesn't receive any data for 30 seconds, the waiting worker will remove all of the keys from has_what that it knows about and send a {op: missing-data} for the specific keys that it was trying to fetch at the time. What will likely happen soon afterwards is that the scheduler asks it to fetch more keys from the same unresponsive worker, thus hitting another timeout, and then another and again and again, for the next 5 minutes.
if the scheduler receives a missing-data(ts, errant_worker)
, it will call a Scheduler.remove_replica(self, ts, ws)
. If ws is the only worker with a replica, the scheduler will reschedule the task, not ask workers to try to fetch the data again.
I understand that there are various problems around but this all doesn't explain how the worker would receive a remove-replica
signal if the scheduler only knows of one worker. This is basically a corruption of the cluster-wide state which is why you need to flush the buffer (#6342 (comment)) to recover.
6. the scheduler responds {"x": []}, because w1 in the meantime has lost the key. | ||
7. x is transitioned fetch -> missing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would the scheduler respond with this message? Why would the worker need to transition to missing? I think the update_who_has should not do anything in this case but rather the scheduler should eventually tell the worker to release the entire task.
If the "update who_has" was not via a dedicated RPC this entire scenario would even be impossible, wouldn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the scheduler should eventually tell the worker to release the entire task.
This will happen for dependencies tracked by {op: compute-task}
, but not for keys fetched through {op: acquire-replicas}
.
Why would the scheduler respond with this message?
- Because a third worker recently sent to the scheduler
{op: missing-data, errant_worker: b, key: x}
- Because the network between b and the scheduler is malfunctioning, so the scheduler hasn't received a heartbeat from b for 5 minutes, but for whatever reason the network between workers a and b is still functioning. This second use case is reproduced in the test (lines 404-407). I could have done the first one, but it would have been more tedious to implement due to time sensitivity.
Because it hasn't received any heartbeat f
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because a third worker recently sent to the scheduler {op: missing-data, errant_worker: b, key: x}
For a dependency this should still be followed by an appropriate free-keys message such that the key will properly released by this worker. It feels like AMM should keep track of the workers it asked to fetch a key to allow for the same mechanism. I would feel more comfortable if both "types" of tasks are handled the same on worker side
Because the network between b and the scheduler is malfunctioning
If the network between a worker and the scheduler is down, we're closing the worker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had some review comments pending. Will give it another look shortly
# 2. b is unresponsive, but the scheduler doesn't know yet as it didn't time out. | ||
# 3. The AMM decides to ask a to drop its replica. | ||
a.handle_remove_replicas(keys=["x"], stimulus_id="test") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a heartbeat doesn't reach the scheduler for 5 minutes, the scheduler will remove the worker and will mark all of the keys that were in memory on that worker as lost.
It will not only mark it as lost but it will try to close the worker, ungracefully
If a gather_dep doesn't receive any data for 30 seconds, the waiting worker will remove all of the keys from has_what that it knows about
Not if it doesn't receive any data but if the connection attempt fails, assuming there is no pooled connection.
I'm struggling a bit with your usage of the term unresponsive. I don't know if this means a dead worker, high latencies, a blocked GIL / event loop, etc.
The way I understand unresponsiveness is if the worker doesn't respond to messages in a given time and the only place where we expect the worker application code to respond in a certain time is during handshakes upon establishment of new RPC connections.
If a heartbeat doesn't reach the scheduler for 5 minutes, the scheduler will remove the worker and will mark all of the keys that were in memory on that worker as lost.
If a gather_dep doesn't receive any data for 30 seconds, the waiting worker will remove all of the keys from has_what that it knows about and send a {op: missing-data} for the specific keys that it was trying to fetch at the time. What will likely happen soon afterwards is that the scheduler asks it to fetch more keys from the same unresponsive worker, thus hitting another timeout, and then another and again and again, for the next 5 minutes.
if the scheduler receives a missing-data(ts, errant_worker)
, it will call a Scheduler.remove_replica(self, ts, ws)
. If ws is the only worker with a replica, the scheduler will reschedule the task, not ask workers to try to fetch the data again.
I understand that there are various problems around but this all doesn't explain how the worker would receive a remove-replica
signal if the scheduler only knows of one worker. This is basically a corruption of the cluster-wide state which is why you need to flush the buffer (#6342 (comment)) to recover.
# Lose the message that would inform the scheduler. | ||
# In theory, this should not happen. | ||
# In practice, in case of TPC connection fault, BatchedSend can drop messages. | ||
assert a.batched_stream.buffer == [ | ||
{"key": "x", "stimulus_id": "test", "op": "release-worker-data"} | ||
] | ||
a.batched_stream.buffer.clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The question is: is not being able to think of anything from the top of our head a reason good enough not to have a failsafe? (not a loaded question)
Well, I think there is no universally true answer to this question.
Why I care so much about this specific case is because this test apparently motivates an instruction in update_who_has
, see https://github.com/dask/distributed/pull/6342/files#r873561939
The argument, as it stands right now, is that we need this instruction because otherwise this test would deadlock. However, this test is an artificial case that intentionally corrupts state and I don't believe we should be able to recover from any kind of state corruption automatically (that's just unrealistic, I believe)
- If we cannot reach this state "naturally" we should remove the instruction and the entire code branch that leads up to it because it is effectively dead code and additional complexity.
- If this code branch is not dead and can be reached naturally, we should invest the time to construct this naturally occurring state to make sure the instruction is indeed doing what it is supposed to and doesn't have unexpected side effects.
If neither of these two options are satisfying, I would propose to instead go for
- keep the code path leading up the the MissingDataMsg but do not issue this instruction but rather fail hard. This will be uncomfortable but we'll know why and how this case is hit and can fix it reliably
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
High level, I still have two problems with this PR
- The tests are pretty artificial and I'm not convinced they are testing real use cases. This worries me a bit since I'm concerned we're maintaining more complex code just to enable these cases.
- I don't feel comfortable for update_who_has to generate
RecsInstrs
. This requires us to put these complicated constructions in everywhere and I feel the cases that are handled by update_who_has should be handled by the transition system. I'm still struggling to understand why it has or should be handled there
recommendations, instructions = merge_recs_instructions(
(recommendations, []),
self._update_who_has(who_has, stimulus_id=stimulus_id),
)
self.transitions(recommendations, stimulus_id=stimulus_id)
self._handle_instructions(instructions)
distributed/worker.py
Outdated
list(ts.who_has), | ||
list(workers), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should log all of this. This list can be pretty large in a real world scenario and this is called frequently. We only limit the number of log entries but not the byte size of the log in total.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing it to show only a length-constrained version of the differences
assert_story( | ||
b.log, | ||
# FIXME: This log should be replaced with a StateMachineEvent log | ||
[ | ||
(f1.key, "ensure-task-exists", "released"), | ||
(f1.key, "released", "fetch", "fetch", {}), | ||
(f1.key, "compute-task", "fetch"), | ||
(f1.key, "put-in-memory"), | ||
], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this log necessary now? It wasn't there before
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To show evidence that recomputing f1 was in fact what happened, as opposed to a.close() not working as intended in the test which would cause a successful fetch->flight->memory cycle.
6. the scheduler responds {"x": []}, because w1 in the meantime has lost the key. | ||
7. x is transitioned fetch -> missing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because a third worker recently sent to the scheduler {op: missing-data, errant_worker: b, key: x}
For a dependency this should still be followed by an appropriate free-keys message such that the key will properly released by this worker. It feels like AMM should keep track of the workers it asked to fetch a key to allow for the same mechanism. I would feel more comfortable if both "types" of tasks are handled the same on worker side
Because the network between b and the scheduler is malfunctioning
If the network between a worker and the scheduler is down, we're closing the worker
# Sever connection between b and s, but not between b and a. | ||
# If a tries fetching from b after this, b will keep responding {status: busy}. | ||
b.periodic_callbacks["heartbeat"].stop() | ||
await s.remove_worker(b.address, close=False, stimulus_id="test") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC you are preparing the scheduler state such that it believes the worker is dead. This is only partially true after this statement because the stream is still open. This is flagged in #6390 as a problem and it suggest we should close the stream as well, i.e. remove the close keyword.
Why this bothers me a bit is because I believe in a real world setup where this connection is severed, the cluster would be able to self heal because the connection between A and B would also close. This would be a different mechanism than what this test is specifically testing.
|
||
|
||
@gen_cluster(client=True, nthreads=[("", 1)]) | ||
async def test_self_denounce_missing_data(c, s, a): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I slightly modified the current version of this test to verify it against main. I didn't like the clearing of the batched send but understand that LockedCommPool does not work since the BatchedSend
is initialized before the pool can be installed.
I modified it such that we re-submit the cleared message and I added code to ensure that the ordering is still as expected. indeed the cluster unblocks after we resubmit the message. Therefore this test only deadlocks on main because we're dropping messages intentionally which should never happen in a real setup.
The below passes on main
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_self_denounce_missing_data(c, s, a):
x = c.submit(inc, 1, key="x")
await x
# Wipe x from the worker. This simulates the following condition:
# 1. The scheduler thinks a and b hold a replica of x.
# 2. b is unresponsive, but the scheduler doesn't know yet as it didn't time out.
# 3. The AMM decides to ask a to drop its replica.
a.handle_remove_replicas(keys=["x"], stimulus_id="test")
# Lose the message that would inform the scheduler.
# In theory, this should not happen.
# In practice, in case of TPC connection fault, BatchedSend can drop messages.
count = a.batched_stream.batch_count
buf = list(a.batched_stream.buffer)
assert buf == [
{"key": "x", "stimulus_id": "test", "op": "release-worker-data"}
]
a.batched_stream.buffer.clear()
y = c.submit(inc, x, key="y")
# The scheduler tries computing y, but a responds that x is not available.
# The scheduler kicks off the computation of x and then y from scratch.
async def check_result():
assert await y == 3
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(check_result(), 0.5)
# There were no further messages over the batched send.
# WE need to assert on this to make sure the ordering of events is not corrupted
assert a.batched_stream.batch_count == count
a.batched_stream.send(*buf)
await check_result()
assert_story(
a.story("compute-task"),
[
("x", "compute-task", "released"),
# The scheduler tries computing y a first time and fails.
# This line would not be here if we didn't lose the
# {"op": "release-worker-data"} message earlier.
("y", "compute-task", "released"),
# The scheduler receives the {"op": "missing-data"} message from the
# worker. This makes the computation of y to fail. The scheduler reschedules
# x and then y.
("x", "compute-task", "released"),
("y", "compute-task", "released"),
],
strict=True,
)
del x
while "x" in a.data:
await asyncio.sleep(0.01)
assert a.tasks["x"].state == "released"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the above version you can see that we're indeed blocked fetching y
(check_result) but after the message is submitted to the scheduler, everything works.
@fjetter added test for fetch->flight transition, as discussed yesterday |
@@ -2098,8 +2098,7 @@ def transition_released_waiting( | |||
if dep_ts.state != "memory": | |||
ts.waiting_for_data.add(dep_ts) | |||
dep_ts.waiters.add(ts) | |||
if dep_ts.state not in {"fetch", "flight"}: | |||
recommendations[dep_ts] = "fetch" | |||
recommendations[dep_ts] = "fetch" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fetch->fetch is a no-op.
flight->fetch is a no-op if not ts.done.
This aligns the logic here and in handle_acquire_replicas.
All test failures are unrelated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still worried about test_self_denounce_missing_data
. It is a very specific test and I do not think it is realistic. It deliberately corrupts cluster-wide state and expects the system to heal. I consider this to be an unrealistic expectation.
If we are able to detect state corruption, I think we should raise hard but this is out of scope for this PR. However, I also do think we should not encode behavior in our tests we do not actually expect from the system.
if ts.state == "fetch": | ||
self.data_needed_per_worker[worker].push(ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if ts.state == "fetch": | |
self.data_needed_per_worker[worker].push(ts) | |
self.data_needed_per_worker[worker].push(dep_ts) |
I'm a bit on the fence with this.
Why this bothers me is because this is the only place in the entire worker where we're modifying Worker
state depending on what TaskState.state
is outside of actual transition functions.
I understand the reasoning but it feels a bit odd
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the change? There's no reason to push it into the heap if state=memory or flight.
data_needed_per_worker
should be a variant of data_needed
, whose only purpose is to be able to fetch out of order.
See
distributed/distributed/worker.py
Lines 2005 to 2014 in 5e9e97f
def _add_to_data_needed(self, ts: TaskState, stimulus_id: str) -> RecsInstrs: | |
self.data_needed.push(ts) | |
for w in ts.who_has: | |
self.data_needed_per_worker[w].push(ts) | |
# This is the same as `return self._ensure_communicating()`, except that when | |
# many tasks transition to fetch at the same time, e.g. from a single | |
# compute-task or acquire-replicas command from the scheduler, it allows | |
# clustering the transfers into less GatherDep instructions; see | |
# _select_keys_for_gather(). | |
return {}, [EnsureCommunicatingAfterTransitions(stimulus_id=stimulus_id)] |
which is a helper function exclusively of
- transition_missing_fetch
- transition_released_fetch
- transition_flight_fetch
(the other two transition_*_fetch functions don't call it because they don't set the state to fetch).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, as I said, it feels odd but I understand. Let's move on
# Wipe x from the worker. This simulates the scheduler calling | ||
# delete_worker_data(a.address, ["x"]), but the RPC call has not returned yet. | ||
a.handle_free_keys(keys=["x"], stimulus_id="test") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If anything this means that delete_worker_data should not work this way. I don't think we should free any keys via direct RPC calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scheduler.delete_worker_data
is only used in replicate and rebalance. Both methods are flagged as being unreliable if the cluster is not at rest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels unnatural for the scheduler to issue any free-keys message if there is only one replica around, even for a rebalance or replicate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed the whole test and slapped a pragma: nocover on the code path. Let me know if I interpreted your opinion correctly.
Thanks for your hard work on this @crusaderky ! |
Contributes to #5896
distributed.worker.TaskState.who_has
after a handle_compute call of a dependent key