Skip to content

Commit

Permalink
Adding default 1 retry for any airflow worker.
Browse files Browse the repository at this point in the history
  • Loading branch information
valayDave committed Jul 29, 2022
1 parent 63d7c12 commit de00042
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion metaflow/plugins/airflow/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,18 @@ def _get_airflow_schedule_interval(self):
def _get_retries(self, node):
max_user_code_retries = 0
max_error_retries = 0
foreach_default_retry = 1
# Different decorators may have different retrying strategies, so take
# the max of them.
for deco in node.decorators:
user_code_retries, error_retries = deco.step_task_retry_count()
max_user_code_retries = max(max_user_code_retries, user_code_retries)
max_error_retries = max(max_error_retries, error_retries)

parent_is_foreach = any( # The immediate parent is a foreach node.
self.graph[n].type == "foreach" for n in node.in_funcs
)
if parent_is_foreach:
max_user_code_retries + foreach_default_retry
return max_user_code_retries, max_user_code_retries + max_error_retries

def _get_retry_delay(self, node):
Expand Down

0 comments on commit de00042

Please sign in to comment.