Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions airflow-core/newsfragments/56866.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
Support numeric multiplier values for retry_exponential_backoff parameter

The ``retry_exponential_backoff`` parameter now accepts numeric values to specify custom exponential backoff multipliers for task retries. Previously, this parameter only accepted boolean values (``True`` or ``False``), with ``True`` using a hardcoded multiplier of ``2.0``.

**New behavior:**

- Numeric values (e.g., ``2.0``, ``3.5``) directly specify the exponential backoff multiplier
- ``retry_exponential_backoff=2.0`` doubles the delay between each retry attempt
- ``retry_exponential_backoff=0`` or ``False`` disables exponential backoff (uses fixed ``retry_delay``)

**Backwards compatibility:**

Existing DAGs using boolean values continue to work:

- ``retry_exponential_backoff=True`` → converted to ``2.0`` (maintains original behavior)
- ``retry_exponential_backoff=False`` → converted to ``0.0`` (no exponential backoff)

**API changes:**

The REST API schema for ``retry_exponential_backoff`` has changed from ``type: boolean`` to ``type: number``. API clients must use numeric values (boolean values will be rejected).

**Migration:**

While boolean values in Python DAGs are automatically converted for backwards compatibility, we recommend updating to explicit numeric values for clarity:

- Change ``retry_exponential_backoff=True`` → ``retry_exponential_backoff=2.0``
- Change ``retry_exponential_backoff=False`` → ``retry_exponential_backoff=0``

* Types of change

* [ ] Dag changes
* [ ] Config changes
* [x] API changes
* [ ] CLI changes
* [x] Behaviour changes
* [ ] Plugin changes
* [ ] Dependency changes
* [ ] Code interface changes
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class TaskResponse(BaseModel):
pool_slots: float | None
execution_timeout: TimeDeltaWithValidation | None
retry_delay: TimeDeltaWithValidation | None
retry_exponential_backoff: bool
retry_exponential_backoff: float
priority_weight: float | None
weight_rule: str | None
ui_color: str | None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12780,7 +12780,7 @@ components:
- $ref: '#/components/schemas/TimeDelta'
- type: 'null'
retry_exponential_backoff:
type: boolean
type: number
title: Retry Exponential Backoff
priority_weight:
anyOf:
Expand Down
14 changes: 12 additions & 2 deletions airflow-core/src/airflow/models/mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

from airflow.exceptions import AirflowException, NotMapped
from airflow.sdk import BaseOperator as TaskSDKBaseOperator
from airflow.sdk.definitions._internal.abstractoperator import DEFAULT_RETRY_DELAY_MULTIPLIER
from airflow.sdk.definitions._internal.node import DAGNode
from airflow.sdk.definitions.mappedoperator import MappedOperator as TaskSDKMappedOperator
from airflow.serialization.definitions.param import SerializedParamsDict
Expand Down Expand Up @@ -277,13 +278,22 @@ def retry_delay(self) -> datetime.timedelta:
return self.partial_kwargs.get("retry_delay", SerializedBaseOperator.retry_delay)

@property
def retry_exponential_backoff(self) -> bool:
return bool(self.partial_kwargs.get("retry_exponential_backoff"))
def retry_exponential_backoff(self) -> float:
value = self.partial_kwargs.get("retry_exponential_backoff", 0)
if value is True:
return 2.0
if value is False:
return 0.0
return float(value)

@property
def max_retry_delay(self) -> datetime.timedelta | None:
return self.partial_kwargs.get("max_retry_delay")

@property
def retry_delay_multiplier(self) -> float:
return float(self.partial_kwargs.get("retry_delay_multiplier", DEFAULT_RETRY_DELAY_MULTIPLIER))

@property
def weight_rule(self) -> PriorityWeightStrategy:
return validate_and_load_priority_weight_strategy(
Expand Down
7 changes: 4 additions & 3 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -958,14 +958,15 @@ def next_retry_datetime(self):
from airflow.sdk.definitions._internal.abstractoperator import MAX_RETRY_DELAY

delay = self.task.retry_delay
if self.task.retry_exponential_backoff:
multiplier = self.task.retry_exponential_backoff if self.task.retry_exponential_backoff != 0 else 1.0
if multiplier != 1.0 and multiplier > 0:
try:
# If the min_backoff calculation is below 1, it will be converted to 0 via int. Thus,
# we must round up prior to converting to an int, otherwise a divide by zero error
# will occur in the modded_hash calculation.
# this probably gives unexpected results if a task instance has previously been cleared,
# because try_number can increase without bound
min_backoff = math.ceil(delay.total_seconds() * (2 ** (self.try_number - 1)))
min_backoff = math.ceil(delay.total_seconds() * (multiplier ** (self.try_number - 1)))
except OverflowError:
min_backoff = MAX_RETRY_DELAY
self.log.warning(
Expand All @@ -987,7 +988,7 @@ def next_retry_datetime(self):
).hexdigest(),
16,
)
# between 1 and 1.0 * delay * (2^retry_number)
# between 1 and 1.0 * delay * (multiplier^retry_number)
modded_hash = min_backoff + ti_hash % min_backoff
# timedelta has a maximum representable value. The exponentiation
# here means this value can be exceeded after a certain number
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@
"pool_slots": { "type": "number", "default": 1 },
"execution_timeout": { "$ref": "#/definitions/timedelta" },
"retry_delay": { "$ref": "#/definitions/timedelta", "default": 300.0 },
"retry_exponential_backoff": { "type": "boolean", "default": false },
"retry_exponential_backoff": { "type": "number", "default": 0 },
"max_retry_delay": { "$ref": "#/definitions/timedelta" },
"params": { "$ref": "#/definitions/params" },
"priority_weight": { "type": "number", "default": 1 },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1299,7 +1299,7 @@ class SerializedBaseOperator(DAGNode, BaseSerialization):
resources: dict[str, Any] | None = None
retries: int = 0
retry_delay: datetime.timedelta = datetime.timedelta(seconds=300)
retry_exponential_backoff: bool = False
retry_exponential_backoff: float = 0
run_as_user: str | None = None

start_date: datetime.datetime | None = None
Expand Down Expand Up @@ -1633,6 +1633,11 @@ def populate_operator(
elif k == "weight_rule":
k = "_weight_rule"
v = decode_priority_weight_strategy(v)
elif k == "retry_exponential_backoff":
if isinstance(v, bool):
v = 2.0 if v else 0
else:
v = float(v)
else:
# Apply centralized deserialization for all other fields
v = cls._deserialize_field_value(k, v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6132,7 +6132,7 @@ export const $TaskResponse = {
]
},
retry_exponential_backoff: {
type: 'boolean',
type: 'number',
title: 'Retry Exponential Backoff'
},
priority_weight: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1502,7 +1502,7 @@ export type TaskResponse = {
pool_slots: number | null;
execution_timeout: TimeDelta | null;
retry_delay: TimeDelta | null;
retry_exponential_backoff: boolean;
retry_exponential_backoff: number;
priority_weight: number | null;
weight_rule: string | null;
ui_color: string | null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def test_should_respond_200(self, test_client):
"queue": "default",
"retries": 0.0,
"retry_delay": {"__type": "TimeDelta", "days": 0, "seconds": 300, "microseconds": 0},
"retry_exponential_backoff": False,
"retry_exponential_backoff": 0,
"start_date": "2020-06-15T00:00:00Z",
"task_id": "op1",
"task_display_name": "op1",
Expand Down Expand Up @@ -149,7 +149,7 @@ def test_mapped_task(self, test_client):
"queue": "default",
"retries": 0.0,
"retry_delay": {"__type": "TimeDelta", "days": 0, "microseconds": 0, "seconds": 300},
"retry_exponential_backoff": False,
"retry_exponential_backoff": 0,
"start_date": "2020-06-15T00:00:00Z",
"task_id": "mapped_task",
"task_display_name": "mapped_task",
Expand Down Expand Up @@ -187,7 +187,7 @@ def test_unscheduled_task(self, test_client):
"queue": "default",
"retries": 0.0,
"retry_delay": {"__type": "TimeDelta", "days": 0, "seconds": 300, "microseconds": 0},
"retry_exponential_backoff": False,
"retry_exponential_backoff": 0,
"start_date": None,
"task_id": None,
"task_display_name": None,
Expand Down Expand Up @@ -246,7 +246,7 @@ def test_should_respond_200_serialized(self, test_client, testing_dag_bundle):
"queue": "default",
"retries": 0.0,
"retry_delay": {"__type": "TimeDelta", "days": 0, "seconds": 300, "microseconds": 0},
"retry_exponential_backoff": False,
"retry_exponential_backoff": 0,
"start_date": "2020-06-15T00:00:00Z",
"task_id": "op1",
"task_display_name": "op1",
Expand Down Expand Up @@ -311,7 +311,7 @@ def test_should_respond_200(self, test_client):
"queue": "default",
"retries": 0.0,
"retry_delay": {"__type": "TimeDelta", "days": 0, "seconds": 300, "microseconds": 0},
"retry_exponential_backoff": False,
"retry_exponential_backoff": 0,
"start_date": "2020-06-15T00:00:00Z",
"task_id": "op1",
"task_display_name": "op1",
Expand Down Expand Up @@ -343,7 +343,7 @@ def test_should_respond_200(self, test_client):
"queue": "default",
"retries": 0.0,
"retry_delay": {"__type": "TimeDelta", "days": 0, "seconds": 300, "microseconds": 0},
"retry_exponential_backoff": False,
"retry_exponential_backoff": 0,
"start_date": "2020-06-16T00:00:00Z",
"task_id": self.task_id2,
"task_display_name": self.task_id2,
Expand Down Expand Up @@ -387,7 +387,7 @@ def test_get_tasks_mapped(self, test_client):
"queue": "default",
"retries": 0.0,
"retry_delay": {"__type": "TimeDelta", "days": 0, "microseconds": 0, "seconds": 300},
"retry_exponential_backoff": False,
"retry_exponential_backoff": 0,
"start_date": "2020-06-15T00:00:00Z",
"task_id": "mapped_task",
"task_display_name": "mapped_task",
Expand Down Expand Up @@ -418,7 +418,7 @@ def test_get_tasks_mapped(self, test_client):
"queue": "default",
"retries": 0.0,
"retry_delay": {"__type": "TimeDelta", "days": 0, "seconds": 300, "microseconds": 0},
"retry_exponential_backoff": False,
"retry_exponential_backoff": 0,
"start_date": "2020-06-15T00:00:00Z",
"task_id": self.task_id3,
"task_display_name": self.task_id3,
Expand Down Expand Up @@ -466,7 +466,7 @@ def test_get_unscheduled_tasks(self, test_client):
"queue": "default",
"retries": 0.0,
"retry_delay": {"__type": "TimeDelta", "days": 0, "seconds": 300, "microseconds": 0},
"retry_exponential_backoff": False,
"retry_exponential_backoff": 0,
"start_date": None,
"task_id": task_id,
"task_display_name": task_id,
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/tests/unit/models/test_mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1407,7 +1407,7 @@ def group(n: int) -> None:
timedelta(seconds=10),
timedelta(seconds=5),
timedelta(seconds=60),
True,
2.0,
1,
2,
"user",
Expand Down
31 changes: 28 additions & 3 deletions airflow-core/tests/unit/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ def test_next_retry_datetime(self, dag_maker):
bash_command="exit 1",
retries=3,
retry_delay=delay,
retry_exponential_backoff=True,
retry_exponential_backoff=2.0,
max_retry_delay=max_delay,
)
ti = dag_maker.create_dagrun().task_instances[0]
Expand Down Expand Up @@ -649,7 +649,7 @@ def test_next_retry_datetime_returns_max_for_overflow(self, dag_maker):
bash_command="exit 1",
retries=3,
retry_delay=delay,
retry_exponential_backoff=True,
retry_exponential_backoff=2.0,
max_retry_delay=max_delay,
)
ti = dag_maker.create_dagrun().task_instances[0]
Expand All @@ -675,7 +675,7 @@ def test_next_retry_datetime_short_or_zero_intervals(self, dag_maker, seconds):
bash_command="exit 1",
retries=3,
retry_delay=delay,
retry_exponential_backoff=True,
retry_exponential_backoff=2.0,
max_retry_delay=max_delay,
)
ti = dag_maker.create_dagrun().task_instances[0]
Expand All @@ -685,6 +685,31 @@ def test_next_retry_datetime_short_or_zero_intervals(self, dag_maker, seconds):
date = ti.next_retry_datetime()
assert date == ti.end_date + datetime.timedelta(seconds=1)

def test_next_retry_datetime_with_custom_multiplier(self, dag_maker):
delay = datetime.timedelta(minutes=4)

with dag_maker(dag_id="fail_dag"):
task = BashOperator(
task_id="task_with_custom_multiplier",
bash_command="exit 1",
retries=3,
retry_delay=delay,
retry_exponential_backoff=5.0,
)
ti = dag_maker.create_dagrun().task_instances[0]
ti.task = task
ti.end_date = pendulum.instance(timezone.utcnow())

ti.try_number = 1
date = ti.next_retry_datetime()
period = ti.end_date.add(seconds=1200) - ti.end_date.add(seconds=240)
assert date in period

ti.try_number = 2
date = ti.next_retry_datetime()
period = ti.end_date.add(seconds=6000) - ti.end_date.add(seconds=1200)
assert date in period

@pytest.mark.usefixtures("test_pool")
def test_mapped_task_reschedule_handling_clear_reschedules(self, dag_maker, task_reschedules_for_ti):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1530,7 +1530,7 @@ def test_no_new_fields_added_to_base_operator(self):
"resources": None,
"retries": 0,
"retry_delay": timedelta(0, 300),
"retry_exponential_backoff": False,
"retry_exponential_backoff": 0,
"run_as_user": None,
"start_date": None,
"start_from_trigger": False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

class TestNotInRetryPeriodDep:
def _get_task_instance(self, state, end_date=None, retry_delay=timedelta(minutes=15)):
task = Mock(retry_delay=retry_delay, retry_exponential_backoff=False)
task = Mock(retry_delay=retry_delay, retry_exponential_backoff=0)
ti = TaskInstance(task=task, state=state, dag_version_id=mock.MagicMock())
ti.end_date = end_date
return ti
Expand Down
2 changes: 1 addition & 1 deletion airflow-ctl/src/airflowctl/api/datamodels/generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -1734,7 +1734,7 @@ class TaskResponse(BaseModel):
pool_slots: Annotated[float | None, Field(title="Pool Slots")] = None
execution_timeout: TimeDelta | None = None
retry_delay: TimeDelta | None = None
retry_exponential_backoff: Annotated[bool, Field(title="Retry Exponential Backoff")]
retry_exponential_backoff: Annotated[float, Field(title="Retry Exponential Backoff")]
priority_weight: Annotated[float | None, Field(title="Priority Weight")] = None
weight_rule: Annotated[str | None, Field(title="Weight Rule")] = None
ui_color: Annotated[str | None, Field(title="Ui Color")] = None
Expand Down
17 changes: 9 additions & 8 deletions task-sdk/src/airflow/sdk/bases/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def partial(**kwargs):
"queue": DEFAULT_QUEUE,
"retries": DEFAULT_RETRIES,
"retry_delay": DEFAULT_RETRY_DELAY,
"retry_exponential_backoff": False,
"retry_exponential_backoff": 0,
"trigger_rule": DEFAULT_TRIGGER_RULE,
"wait_for_past_depends_before_skipping": DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING,
"wait_for_downstream": False,
Expand Down Expand Up @@ -269,7 +269,7 @@ def partial(
execution_timeout: timedelta | None = ...,
max_retry_delay: None | timedelta | float = ...,
retry_delay: timedelta | float = ...,
retry_exponential_backoff: bool = ...,
retry_exponential_backoff: float = ...,
priority_weight: int = ...,
weight_rule: str | PriorityWeightStrategy = ...,
sla: timedelta | None = ...,
Expand Down Expand Up @@ -569,7 +569,7 @@ def __new__(cls, name, bases, namespace, **kwargs):
"email_on_retry": bool,
"email_on_failure": bool,
"retries": int,
"retry_exponential_backoff": bool,
"retry_exponential_backoff": (int, float),
"depends_on_past": bool,
"ignore_first_depends_on_past": bool,
"wait_for_past_depends_before_skipping": bool,
Expand Down Expand Up @@ -648,9 +648,10 @@ class derived from this one results in the creation of a task object,
:param retry_delay: delay between retries, can be set as ``timedelta`` or
``float`` seconds, which will be converted into ``timedelta``,
the default is ``timedelta(seconds=300)``.
:param retry_exponential_backoff: allow progressively longer waits between
retries by using exponential backoff algorithm on retry delay (delay
will be converted into seconds)
:param retry_exponential_backoff: multiplier for exponential backoff between retries.
Set to 0 to disable (constant delay). Set to 2.0 for standard exponential backoff
(delay doubles with each retry). For example, with retry_delay=4min and
retry_exponential_backoff=5, retries occur after 4min, 20min, 100min, etc.
:param max_retry_delay: maximum delay interval between retries, can be set as
``timedelta`` or ``float`` seconds, which will be converted into ``timedelta``.
:param start_date: The ``start_date`` for the task, determines
Expand Down Expand Up @@ -827,7 +828,7 @@ def say_hello_world(**context):
email_on_failure: bool = True
retries: int | None = DEFAULT_RETRIES
retry_delay: timedelta = DEFAULT_RETRY_DELAY
retry_exponential_backoff: bool = False
retry_exponential_backoff: float = 0
max_retry_delay: timedelta | float | None = None
start_date: datetime | None = None
end_date: datetime | None = None
Expand Down Expand Up @@ -985,7 +986,7 @@ def __init__(
email_on_failure: bool = True,
retries: int | None = DEFAULT_RETRIES,
retry_delay: timedelta | float = DEFAULT_RETRY_DELAY,
retry_exponential_backoff: bool = False,
retry_exponential_backoff: float = 0,
max_retry_delay: timedelta | float | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
Expand Down
Loading
Loading