-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[data] MapOperator.num_active_tasks should exclude pending actors #46364
Changes from all commits
6fd63af
eac1fe3
e9b4357
584d28d
8ccdb1a
f0a01a1
5796deb
d2de473
d7d79c9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -414,6 +414,17 @@ def implements_accurate_memory_accounting(self) -> bool: | |
def supports_fusion(self) -> bool: | ||
return self._supports_fusion | ||
|
||
def num_active_tasks(self) -> int: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I'm understanding correctly, this change makes @raulchen do have any ideas for how we can address the issues while avoiding this discrepancy? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought of this as well. It's indeed a bit confusing. |
||
# Override `num_active_tasks` to only include data tasks and exclude | ||
# metadata tasks, which are used by the actor-pool map operator to | ||
# check if a newly created actor is ready. | ||
# The reasons are because: | ||
# 1. `PhysicalOperator.completed` checks `num_active_tasks`. The operator | ||
# should be considered completed if there are still pending actors. | ||
# 2. The number of active tasks in the progress bar will be more accurate | ||
# to reflect the actual data processing tasks. | ||
return len(self._data_tasks) | ||
|
||
|
||
def _map_task( | ||
map_transformer: MapTransformer, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is "readiness checking task" referring to in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's
_MapWorker.get_location
.