diff --git a/task-sdk/src/airflow/sdk/bases/operator.py b/task-sdk/src/airflow/sdk/bases/operator.py index 7f6d6cf297a42..e12e71995dc8e 100644 --- a/task-sdk/src/airflow/sdk/bases/operator.py +++ b/task-sdk/src/airflow/sdk/bases/operator.py @@ -38,6 +38,8 @@ from airflow.sdk import TriggerRule, timezone from airflow.sdk._shared.secrets_masker import redact from airflow.sdk.definitions._internal.abstractoperator import ( + DEFAULT_EMAIL_ON_FAILURE, + DEFAULT_EMAIL_ON_RETRY, DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST, DEFAULT_OWNER, DEFAULT_POOL_NAME, @@ -219,8 +221,8 @@ def partial(**kwargs): OPERATOR_DEFAULTS: dict[str, Any] = { "allow_nested_operators": True, "depends_on_past": False, - "email_on_failure": True, - "email_on_retry": True, + "email_on_failure": DEFAULT_EMAIL_ON_FAILURE, + "email_on_retry": DEFAULT_EMAIL_ON_RETRY, "execution_timeout": DEFAULT_TASK_EXECUTION_TIMEOUT, # "executor": DEFAULT_EXECUTOR, "executor_config": {}, @@ -826,8 +828,8 @@ def say_hello_world(**context): task_id: str owner: str = DEFAULT_OWNER email: str | Sequence[str] | None = None - email_on_retry: bool = True - email_on_failure: bool = True + email_on_retry: bool = DEFAULT_EMAIL_ON_RETRY + email_on_failure: bool = DEFAULT_EMAIL_ON_FAILURE retries: int | None = DEFAULT_RETRIES retry_delay: timedelta = DEFAULT_RETRY_DELAY retry_exponential_backoff: float = 0 @@ -984,8 +986,8 @@ def __init__( task_id: str, owner: str = DEFAULT_OWNER, email: str | Sequence[str] | None = None, - email_on_retry: bool = True, - email_on_failure: bool = True, + email_on_retry: bool = DEFAULT_EMAIL_ON_RETRY, + email_on_failure: bool = DEFAULT_EMAIL_ON_FAILURE, retries: int | None = DEFAULT_RETRIES, retry_delay: timedelta | float = DEFAULT_RETRY_DELAY, retry_exponential_backoff: float = 0, diff --git a/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py b/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py index b4503b4177b9c..6c99a72b22080 100644 --- a/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py +++ b/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py @@ -76,7 +76,8 @@ DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = conf.gettimedelta( "core", "default_task_execution_timeout" ) - +DEFAULT_EMAIL_ON_FAILURE: bool = conf.getboolean("email", "default_email_on_failure", fallback=True) +DEFAULT_EMAIL_ON_RETRY: bool = conf.getboolean("email", "default_email_on_retry", fallback=True) log = logging.getLogger(__name__)