Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions task-sdk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,7 @@ dependencies = [
"python-dateutil>=2.7.0",
"psutil>=6.1.0",
"structlog>=25.4.0",
"retryhttp>=1.2.0,!=1.3.0",
"greenback>=1.2.1",
# Requests is known to introduce breaking changes, so we pin it to a specific range
"requests>=2.31.0,<3",
"types-requests>=2.31.0",
"tenacity>=8.3.0",
# Start of shared timezones dependencies
"pendulum>=3.1.0",
Expand Down
29 changes: 19 additions & 10 deletions task-sdk/src/airflow/sdk/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@
import msgspec
import structlog
from pydantic import BaseModel
from retryhttp import retry, wait_retry_after
from tenacity import before_log, wait_random_exponential
from tenacity import (
before_log,
retry,
retry_if_exception,
stop_after_attempt,
wait_random_exponential,
)
from uuid6 import uuid7

from airflow.configuration import conf
Expand Down Expand Up @@ -812,6 +817,14 @@ def noop_handler(request: httpx.Request) -> httpx.Response:
API_SSL_CERT_PATH = conf.get("api", "ssl_cert")


def _should_retry_api_request(exception: BaseException) -> bool:
"""Determine if an API request should be retried based on the exception type."""
if isinstance(exception, httpx.HTTPStatusError):
return exception.response.status_code >= 500

return isinstance(exception, httpx.RequestError)


class Client(httpx.Client):
def __init__(self, *, base_url: str | None, dry_run: bool = False, token: str, **kwargs: Any):
if (not base_url) ^ dry_run:
Expand Down Expand Up @@ -840,21 +853,17 @@ def __init__(self, *, base_url: str | None, dry_run: bool = False, token: str, *
**kwargs,
)

_default_wait = wait_random_exponential(min=API_RETRY_WAIT_MIN, max=API_RETRY_WAIT_MAX)

def _update_auth(self, response: httpx.Response):
if new_token := response.headers.get("Refreshed-API-Token"):
log.debug("Execution API issued us a refreshed Task token")
self.auth = BearerAuth(new_token)

@retry(
reraise=True,
max_attempt_number=API_RETRIES,
wait_server_errors=_default_wait,
wait_network_errors=_default_wait,
wait_timeouts=_default_wait,
wait_rate_limited=wait_retry_after(fallback=_default_wait), # No infinite timeout on HTTP 429
retry=retry_if_exception(_should_retry_api_request),
stop=stop_after_attempt(API_RETRIES),
wait=wait_random_exponential(min=API_RETRY_WAIT_MIN, max=API_RETRY_WAIT_MAX),
before_sleep=before_log(log, logging.WARNING),
reraise=True,
)
def request(self, *args, **kwargs):
"""Implement a convenience for httpx.Client.request with a retry layer."""
Expand Down
Loading