Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1263,7 +1263,7 @@ def _email_alert(*, task_instance: TaskInstance, exception, task: BaseOperator)

def _get_email_subject_content(
*,
task_instance: TaskInstance,
task_instance: TaskInstance | RuntimeTaskInstanceProtocol,
exception: BaseException,
task: BaseOperator | None = None,
) -> tuple[str, str, str]:
Expand Down
6 changes: 3 additions & 3 deletions task-sdk/src/airflow/sdk/definitions/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1032,19 +1032,19 @@ def __init__(

if email is not None:
warnings.warn(
"email is deprecated please migrate to SmtpNotifier`.",
"Setting email on a task is deprecated; please migrate to SmtpNotifier.",
RemovedInAirflow4Warning,
stacklevel=2,
)
if email and email_on_retry is not None:
warnings.warn(
"email_on_retry is deprecated please migrate to SmtpNotifier`.",
"Setting email_on_retry on a task is deprecated; please migrate to SmtpNotifier.",
RemovedInAirflow4Warning,
stacklevel=2,
)
if email and email_on_failure is not None:
warnings.warn(
"email_on_failure is deprecated please migrate to SmtpNotifier`.",
"Setting email_on_failure on a task is deprecated; please migrate to SmtpNotifier.",
RemovedInAirflow4Warning,
stacklevel=2,
)
Expand Down
15 changes: 15 additions & 0 deletions task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,17 @@ def _run_task_state_change_callbacks(
log.exception("Failed to run task callback", kind=kind, index=i, callback=callback)


def _send_task_error_email(to: Iterable[str], ti: RuntimeTaskInstance, exception: BaseException) -> None:
from airflow.models.taskinstance import _get_email_subject_content
from airflow.utils.email import send_email

subject, content, err = _get_email_subject_content(task_instance=ti, exception=exception)
try:
send_email(to, subject, content)
except Exception:
send_email(to, subject, err)


def _execute_task(context: Context, ti: RuntimeTaskInstance, log: Logger):
"""Execute Task (optionally with a Timeout) and push Xcom results."""
from airflow.exceptions import AirflowTaskTimeout
Expand Down Expand Up @@ -994,11 +1005,15 @@ def finalize(
get_listener_manager().hook.on_task_instance_failed(
previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error
)
if error and task.email_on_retry and task.email:
_send_task_error_email(task.email, ti, error)
elif state == TerminalTIState.FAILED:
_run_task_state_change_callbacks(task, "on_failure_callback", context, log)
get_listener_manager().hook.on_task_instance_failed(
previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error
)
if error and task.email_on_failure and task.email:
_send_task_error_email(task.email, ti, error)

get_listener_manager().hook.before_stopping(component=TaskRunnerMarker())

Expand Down