diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 66975f57815ea..6655171cf3bc7 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1263,7 +1263,7 @@ def _email_alert(*, task_instance: TaskInstance, exception, task: BaseOperator) def _get_email_subject_content( *, - task_instance: TaskInstance, + task_instance: TaskInstance | RuntimeTaskInstanceProtocol, exception: BaseException, task: BaseOperator | None = None, ) -> tuple[str, str, str]: diff --git a/task-sdk/src/airflow/sdk/definitions/baseoperator.py b/task-sdk/src/airflow/sdk/definitions/baseoperator.py index 4467f6b418490..4d81350bc42fb 100644 --- a/task-sdk/src/airflow/sdk/definitions/baseoperator.py +++ b/task-sdk/src/airflow/sdk/definitions/baseoperator.py @@ -1032,19 +1032,19 @@ def __init__( if email is not None: warnings.warn( - "email is deprecated please migrate to SmtpNotifier`.", + "Setting email on a task is deprecated; please migrate to SmtpNotifier.", RemovedInAirflow4Warning, stacklevel=2, ) if email and email_on_retry is not None: warnings.warn( - "email_on_retry is deprecated please migrate to SmtpNotifier`.", + "Setting email_on_retry on a task is deprecated; please migrate to SmtpNotifier.", RemovedInAirflow4Warning, stacklevel=2, ) if email and email_on_failure is not None: warnings.warn( - "email_on_failure is deprecated please migrate to SmtpNotifier`.", + "Setting email_on_failure on a task is deprecated; please migrate to SmtpNotifier.", RemovedInAirflow4Warning, stacklevel=2, ) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 31c24c3332495..29b16ee5dc730 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -857,6 +857,17 @@ def _run_task_state_change_callbacks( log.exception("Failed to run task callback", kind=kind, index=i, callback=callback) +def _send_task_error_email(to: Iterable[str], ti: RuntimeTaskInstance, exception: BaseException) -> None: + from airflow.models.taskinstance import _get_email_subject_content + from airflow.utils.email import send_email + + subject, content, err = _get_email_subject_content(task_instance=ti, exception=exception) + try: + send_email(to, subject, content) + except Exception: + send_email(to, subject, err) + + def _execute_task(context: Context, ti: RuntimeTaskInstance, log: Logger): """Execute Task (optionally with a Timeout) and push Xcom results.""" from airflow.exceptions import AirflowTaskTimeout @@ -994,11 +1005,15 @@ def finalize( get_listener_manager().hook.on_task_instance_failed( previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error ) + if error and task.email_on_retry and task.email: + _send_task_error_email(task.email, ti, error) elif state == TerminalTIState.FAILED: _run_task_state_change_callbacks(task, "on_failure_callback", context, log) get_listener_manager().hook.on_task_instance_failed( previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error ) + if error and task.email_on_failure and task.email: + _send_task_error_email(task.email, ti, error) get_listener_manager().hook.before_stopping(component=TaskRunnerMarker())