Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions airflow-core/src/airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
XComSequenceSliceResult,
)
from airflow.sdk.execution_time.supervisor import WatchedSubprocess
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance, _send_task_error_email
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance, _send_error_email_notification
from airflow.serialization.serialized_objects import LazyDeserializedDAG, SerializedDAG
from airflow.stats import Stats
from airflow.utils.file import iter_airflow_imports
Expand Down Expand Up @@ -457,7 +457,9 @@ def _execute_email_callbacks(dagbag: DagBag, request: EmailRequest, log: Filteri
)

try:
_send_task_error_email(task.email, runtime_ti, request.msg, log)
context = runtime_ti.get_template_context()
error = Exception(request.msg) if request.msg else None
_send_error_email_notification(task, runtime_ti, context, error, log)
except Exception:
log.exception(
"Failed to send %s email",
Expand Down
65 changes: 40 additions & 25 deletions airflow-core/tests/unit/dag_processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from collections.abc import Callable
from socket import socketpair
from typing import TYPE_CHECKING, BinaryIO
from unittest import mock
from unittest.mock import MagicMock, patch

import pytest
Expand Down Expand Up @@ -79,6 +78,7 @@
XComResult,
XComSequenceSliceResult,
)
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
from airflow.utils.session import create_session
from airflow.utils.state import TaskInstanceState

Expand Down Expand Up @@ -1468,12 +1468,12 @@ def fake_collect_dags(self, *args, **kwargs):
class TestExecuteEmailCallbacks:
"""Test the email callback execution functionality."""

@patch("airflow.dag_processing.processor._send_task_error_email")
@patch("airflow.dag_processing.processor._send_error_email_notification")
def test_execute_email_callbacks_failure(self, mock_send_email):
"""Test email callback execution for task failure."""
dagbag = MagicMock(spec=DagBag)
with DAG(dag_id="test_dag") as dag:
BaseOperator(task_id="test_task", email="test@example.com")
task = BaseOperator(task_id="test_task", email="test@example.com")
dagbag.dags = {"test_dag": dag}

# Create TI data
Expand Down Expand Up @@ -1517,24 +1517,34 @@ def test_execute_email_callbacks_failure(self, mock_send_email):
)

log = MagicMock(spec=FilteringBoundLogger)
runtime_ti = RuntimeTaskInstance.model_construct(
**request.ti.model_dump(exclude_unset=True),
task=task,
_ti_context_from_server=request.context_from_server,
max_tries=request.context_from_server.max_tries,
)

# Execute email callbacks
_execute_email_callbacks(dagbag, request, log)

# Verify email was sent
mock_send_email.assert_called_once_with(
"test@example.com",
mock.ANY, # mocked Runtime TI
"Task failed",
log,
)

@patch("airflow.dag_processing.processor._send_task_error_email")
mock_send_email.assert_called_once()
call_args = mock_send_email.call_args[0]

assert call_args[0] == task
assert call_args[1].task_id == runtime_ti.task_id
assert call_args[1].dag_id == runtime_ti.dag_id
assert call_args[2] is not None # context
assert isinstance(call_args[3], Exception)
assert call_args[3].args[0] == request.msg
assert call_args[4] == log

@patch("airflow.dag_processing.processor._send_error_email_notification")
def test_execute_email_callbacks_retry(self, mock_send_email):
"""Test email callback execution for task retry."""
dagbag = MagicMock(spec=DagBag)
with DAG(dag_id="test_dag") as dag:
BaseOperator(task_id="test_task", email=["test@example.com"])
task = BaseOperator(task_id="test_task", email=["test@example.com"])
dagbag.dags = {"test_dag": dag}

ti_data = TIDataModel(
Expand Down Expand Up @@ -1578,19 +1588,28 @@ def test_execute_email_callbacks_retry(self, mock_send_email):
)

log = MagicMock(spec=FilteringBoundLogger)
runtime_ti = RuntimeTaskInstance.model_construct(
**request.ti.model_dump(exclude_unset=True),
task=task,
_ti_context_from_server=request.context_from_server,
max_tries=request.context_from_server.max_tries,
)

# Execute email callbacks
_execute_email_callbacks(dagbag, request, log)

# Verify email was sent
mock_send_email.assert_called_once_with(
["test@example.com"],
mock.ANY, # mocked Runtime TI
"Task retry",
log,
)
mock_send_email.assert_called_once()
call_args = mock_send_email.call_args[0]

assert call_args[0] == task
assert call_args[1].task_id == runtime_ti.task_id
assert call_args[1].dag_id == runtime_ti.dag_id
assert call_args[2] is not None # context
assert isinstance(call_args[3], Exception)
assert call_args[3].args[0] == request.msg
assert call_args[4] == log

@patch("airflow.dag_processing.processor._send_task_error_email")
@patch("airflow.dag_processing.processor._send_error_email_notification")
def test_execute_email_callbacks_no_email_configured(self, mock_send_email):
"""Test email callback when no email is configured."""
dagbag = MagicMock(spec=DagBag)
Expand Down Expand Up @@ -1647,8 +1666,7 @@ def test_execute_email_callbacks_no_email_configured(self, mock_send_email):
assert "Email callback requested but no email configured" in warning_call
mock_send_email.assert_not_called()

@patch("airflow.dag_processing.processor._send_task_error_email")
def test_execute_email_callbacks_email_disabled_for_type(self, mock_send_email):
def test_execute_email_callbacks_email_disabled_for_type(self):
"""Test email callback when email is disabled for the specific type."""
dagbag = MagicMock(spec=DagBag)
with DAG(dag_id="test_dag") as dag:
Expand Down Expand Up @@ -1700,9 +1718,6 @@ def test_execute_email_callbacks_email_disabled_for_type(self, mock_send_email):
# Execute email callbacks
_execute_email_callbacks(dagbag, request, log)

# Verify no email was sent
mock_send_email.assert_not_called()

# Verify info log about email being disabled
log.info.assert_called_once()
info_call = log.info.call_args[0][0]
Expand Down
145 changes: 62 additions & 83 deletions task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1204,99 +1204,78 @@ def _run_task_state_change_callbacks(
log.exception("Failed to run task callback", kind=kind, index=i, callback=callback)


def _get_email_subject_content(
*,
task_instance: RuntimeTaskInstance,
exception: BaseException | str | None,
def _send_error_email_notification(
task: BaseOperator | MappedOperator,
ti: RuntimeTaskInstance,
context: Context,
error: BaseException | str | None,
log: Logger,
) -> tuple[str, str, str]:
"""
Get the email subject content for exceptions.

:param task_instance: the task instance
:param exception: the exception sent in the email
:param task:

:meta private:
"""
from airflow.sdk.definitions._internal.templater import SandboxedEnvironment
from airflow.sdk.definitions.context import Context, render_template_to_string

exception_html = str(exception).replace("\n", "<br>")

default_subject = "Airflow alert: {{ti}}"
# For reporting purposes, we report based on 1-indexed,
# not 0-indexed lists (i.e. Try 1 instead of
# Try 0 for the first attempt).
default_html_content = (
"Try {{try_number}} out of {{max_tries + 1}}<br>"
"Exception:<br>{{exception_html}}<br>"
'Log: <a href="{{ti.log_url}}">Link</a><br>'
"Host: {{ti.hostname}}<br>"
'Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>'
)
) -> None:
"""Send email notification for task errors using SmtpNotifier."""
try:
from airflow.providers.smtp.notifications.smtp import SmtpNotifier
except ImportError:
log.error(
"Failed to send task failure or retry email notification: "
"`apache-airflow-providers-smtp` is not installed. "
"Install this provider to enable email notifications."
)
return

default_html_content_err = (
"Try {{try_number}} out of {{max_tries + 1}}<br>"
"Exception:<br>Failed attempt to attach error logs<br>"
'Log: <a href="{{ti.log_url}}">Link</a><br>'
"Host: {{ti.hostname}}<br>"
'Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>'
)
if not task.email:
return

additional_context: dict[str, Any] = {
"exception": exception,
"exception_html": exception_html,
"try_number": task_instance.try_number,
"max_tries": task_instance.max_tries,
}
subject_template_file = conf.get("email", "subject_template", fallback=None)

# Use the Dag's get_template_env() to set force_sandboxed. Don't add
# the flag to the function on task object -- that function can be
# overridden, and adding a flag breaks backward compatibility.
dag = task_instance.task.get_dag()
if dag:
jinja_env = dag.get_template_env(force_sandboxed=True)
# Read the template file if configured
if subject_template_file and Path(subject_template_file).exists():
subject = Path(subject_template_file).read_text()
else:
jinja_env = SandboxedEnvironment(cache_size=0)
jinja_context = task_instance.get_template_context()
if not jinja_context:
jinja_context = Context()
# Add additional fields to the context for email template rendering
jinja_context.update(additional_context) # type: ignore[typeddict-item]

def render(key: str, content: str) -> str:
if conf.has_option("email", key):
path = conf.get_mandatory_value("email", key)
try:
with open(path) as f:
content = f.read()
except FileNotFoundError:
log.warning("Could not find email template file. Using defaults...", file=path)
except OSError:
log.exception("Error while using email template. Using defaults...", file=path)
return render_template_to_string(jinja_env.from_string(content), jinja_context)
# Fallback to default
subject = "Airflow alert: {{ti}}"

subject = render("subject_template", default_subject)
html_content = render("html_content_template", default_html_content)
html_content_err = render("html_content_template", default_html_content_err)
html_content_template_file = conf.get("email", "html_content_template", fallback=None)

return subject, html_content, html_content_err
# Read the template file if configured
if html_content_template_file and Path(html_content_template_file).exists():
html_content = Path(html_content_template_file).read_text()
else:
# Fallback to default
# For reporting purposes, we report based on 1-indexed,
# not 0-indexed lists (i.e. Try 1 instead of Try 0 for the first attempt).
html_content = (
"Try {{try_number}} out of {{max_tries + 1}}<br>"
"Exception:<br>{{exception_html}}<br>"
'Log: <a href="{{ti.log_url}}">Link</a><br>'
"Host: {{ti.hostname}}<br>"
'Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>'
)

# Add exception_html to context for template rendering
import html

def _send_task_error_email(
to: Iterable[str],
ti: RuntimeTaskInstance,
exception: BaseException | str | None,
log: Logger,
) -> None:
from airflow.utils.email import send_email
exception_html = html.escape(str(error)).replace("\n", "<br>")
additional_context = {
"exception": error,
"exception_html": exception_html,
"try_number": ti.try_number,
"max_tries": ti.max_tries,
}
email_context = {**context, **additional_context}
to_emails = task.email
if not to_emails:
return

subject, content, err = _get_email_subject_content(task_instance=ti, exception=exception, log=log)
try:
send_email(to, subject, content)
notifier = SmtpNotifier(
to=to_emails,
subject=subject,
html_content=html_content,
from_email=conf.get("email", "from_email", fallback="airflow@airflow"),
)
notifier(email_context)
except Exception:
send_email(to, subject, err)
log.exception("Failed to send email notification")


def _execute_task(context: Context, ti: RuntimeTaskInstance, log: Logger):
Expand Down Expand Up @@ -1457,7 +1436,7 @@ def finalize(
except Exception:
log.exception("error calling listener")
if error and task.email_on_retry and task.email:
_send_task_error_email(task.email, ti, error, log)
_send_error_email_notification(task, ti, context, error, log)
elif state == TaskInstanceState.FAILED:
_run_task_state_change_callbacks(task, "on_failure_callback", context, log)
try:
Expand All @@ -1467,7 +1446,7 @@ def finalize(
except Exception:
log.exception("error calling listener")
if error and task.email_on_failure and task.email:
_send_task_error_email(task.email, ti, error, log)
_send_error_email_notification(task, ti, context, error, log)

try:
get_listener_manager().hook.before_stopping(component=TaskRunnerMarker())
Expand Down
Loading