diff --git a/airflow-core/src/airflow/dag_processing/processor.py b/airflow-core/src/airflow/dag_processing/processor.py
index 122b2fd0768a8..0dd08e3a6b7b2 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -64,7 +64,7 @@
XComSequenceSliceResult,
)
from airflow.sdk.execution_time.supervisor import WatchedSubprocess
-from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance, _send_task_error_email
+from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance, _send_error_email_notification
from airflow.serialization.serialized_objects import LazyDeserializedDAG, SerializedDAG
from airflow.stats import Stats
from airflow.utils.file import iter_airflow_imports
@@ -457,7 +457,9 @@ def _execute_email_callbacks(dagbag: DagBag, request: EmailRequest, log: Filteri
)
try:
- _send_task_error_email(task.email, runtime_ti, request.msg, log)
+ context = runtime_ti.get_template_context()
+ error = Exception(request.msg) if request.msg else None
+ _send_error_email_notification(task, runtime_ti, context, error, log)
except Exception:
log.exception(
"Failed to send %s email",
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py b/airflow-core/tests/unit/dag_processing/test_processor.py
index 099b14e25113b..398bd5fcdabba 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -26,7 +26,6 @@
from collections.abc import Callable
from socket import socketpair
from typing import TYPE_CHECKING, BinaryIO
-from unittest import mock
from unittest.mock import MagicMock, patch
import pytest
@@ -79,6 +78,7 @@
XComResult,
XComSequenceSliceResult,
)
+from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
from airflow.utils.session import create_session
from airflow.utils.state import TaskInstanceState
@@ -1468,12 +1468,12 @@ def fake_collect_dags(self, *args, **kwargs):
class TestExecuteEmailCallbacks:
"""Test the email callback execution functionality."""
- @patch("airflow.dag_processing.processor._send_task_error_email")
+ @patch("airflow.dag_processing.processor._send_error_email_notification")
def test_execute_email_callbacks_failure(self, mock_send_email):
"""Test email callback execution for task failure."""
dagbag = MagicMock(spec=DagBag)
with DAG(dag_id="test_dag") as dag:
- BaseOperator(task_id="test_task", email="test@example.com")
+ task = BaseOperator(task_id="test_task", email="test@example.com")
dagbag.dags = {"test_dag": dag}
# Create TI data
@@ -1517,24 +1517,34 @@ def test_execute_email_callbacks_failure(self, mock_send_email):
)
log = MagicMock(spec=FilteringBoundLogger)
+ runtime_ti = RuntimeTaskInstance.model_construct(
+ **request.ti.model_dump(exclude_unset=True),
+ task=task,
+ _ti_context_from_server=request.context_from_server,
+ max_tries=request.context_from_server.max_tries,
+ )
# Execute email callbacks
_execute_email_callbacks(dagbag, request, log)
# Verify email was sent
- mock_send_email.assert_called_once_with(
- "test@example.com",
- mock.ANY, # mocked Runtime TI
- "Task failed",
- log,
- )
-
- @patch("airflow.dag_processing.processor._send_task_error_email")
+ mock_send_email.assert_called_once()
+ call_args = mock_send_email.call_args[0]
+
+ assert call_args[0] == task
+ assert call_args[1].task_id == runtime_ti.task_id
+ assert call_args[1].dag_id == runtime_ti.dag_id
+ assert call_args[2] is not None # context
+ assert isinstance(call_args[3], Exception)
+ assert call_args[3].args[0] == request.msg
+ assert call_args[4] == log
+
+ @patch("airflow.dag_processing.processor._send_error_email_notification")
def test_execute_email_callbacks_retry(self, mock_send_email):
"""Test email callback execution for task retry."""
dagbag = MagicMock(spec=DagBag)
with DAG(dag_id="test_dag") as dag:
- BaseOperator(task_id="test_task", email=["test@example.com"])
+ task = BaseOperator(task_id="test_task", email=["test@example.com"])
dagbag.dags = {"test_dag": dag}
ti_data = TIDataModel(
@@ -1578,19 +1588,28 @@ def test_execute_email_callbacks_retry(self, mock_send_email):
)
log = MagicMock(spec=FilteringBoundLogger)
+ runtime_ti = RuntimeTaskInstance.model_construct(
+ **request.ti.model_dump(exclude_unset=True),
+ task=task,
+ _ti_context_from_server=request.context_from_server,
+ max_tries=request.context_from_server.max_tries,
+ )
# Execute email callbacks
_execute_email_callbacks(dagbag, request, log)
- # Verify email was sent
- mock_send_email.assert_called_once_with(
- ["test@example.com"],
- mock.ANY, # mocked Runtime TI
- "Task retry",
- log,
- )
+ mock_send_email.assert_called_once()
+ call_args = mock_send_email.call_args[0]
+
+ assert call_args[0] == task
+ assert call_args[1].task_id == runtime_ti.task_id
+ assert call_args[1].dag_id == runtime_ti.dag_id
+ assert call_args[2] is not None # context
+ assert isinstance(call_args[3], Exception)
+ assert call_args[3].args[0] == request.msg
+ assert call_args[4] == log
- @patch("airflow.dag_processing.processor._send_task_error_email")
+ @patch("airflow.dag_processing.processor._send_error_email_notification")
def test_execute_email_callbacks_no_email_configured(self, mock_send_email):
"""Test email callback when no email is configured."""
dagbag = MagicMock(spec=DagBag)
@@ -1647,8 +1666,7 @@ def test_execute_email_callbacks_no_email_configured(self, mock_send_email):
assert "Email callback requested but no email configured" in warning_call
mock_send_email.assert_not_called()
- @patch("airflow.dag_processing.processor._send_task_error_email")
- def test_execute_email_callbacks_email_disabled_for_type(self, mock_send_email):
+ def test_execute_email_callbacks_email_disabled_for_type(self):
"""Test email callback when email is disabled for the specific type."""
dagbag = MagicMock(spec=DagBag)
with DAG(dag_id="test_dag") as dag:
@@ -1700,9 +1718,6 @@ def test_execute_email_callbacks_email_disabled_for_type(self, mock_send_email):
# Execute email callbacks
_execute_email_callbacks(dagbag, request, log)
- # Verify no email was sent
- mock_send_email.assert_not_called()
-
# Verify info log about email being disabled
log.info.assert_called_once()
info_call = log.info.call_args[0][0]
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 3c512be9cd594..8ca403f3c96ca 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1204,99 +1204,78 @@ def _run_task_state_change_callbacks(
log.exception("Failed to run task callback", kind=kind, index=i, callback=callback)
-def _get_email_subject_content(
- *,
- task_instance: RuntimeTaskInstance,
- exception: BaseException | str | None,
+def _send_error_email_notification(
+ task: BaseOperator | MappedOperator,
+ ti: RuntimeTaskInstance,
+ context: Context,
+ error: BaseException | str | None,
log: Logger,
-) -> tuple[str, str, str]:
- """
- Get the email subject content for exceptions.
-
- :param task_instance: the task instance
- :param exception: the exception sent in the email
- :param task:
-
- :meta private:
- """
- from airflow.sdk.definitions._internal.templater import SandboxedEnvironment
- from airflow.sdk.definitions.context import Context, render_template_to_string
-
- exception_html = str(exception).replace("\n", "
")
-
- default_subject = "Airflow alert: {{ti}}"
- # For reporting purposes, we report based on 1-indexed,
- # not 0-indexed lists (i.e. Try 1 instead of
- # Try 0 for the first attempt).
- default_html_content = (
- "Try {{try_number}} out of {{max_tries + 1}}
"
- "Exception:
{{exception_html}}
"
- 'Log: Link
'
- "Host: {{ti.hostname}}
"
- 'Mark success: Link
'
- )
+) -> None:
+ """Send email notification for task errors using SmtpNotifier."""
+ try:
+ from airflow.providers.smtp.notifications.smtp import SmtpNotifier
+ except ImportError:
+ log.error(
+ "Failed to send task failure or retry email notification: "
+ "`apache-airflow-providers-smtp` is not installed. "
+ "Install this provider to enable email notifications."
+ )
+ return
- default_html_content_err = (
- "Try {{try_number}} out of {{max_tries + 1}}
"
- "Exception:
Failed attempt to attach error logs
"
- 'Log: Link
'
- "Host: {{ti.hostname}}
"
- 'Mark success: Link
'
- )
+ if not task.email:
+ return
- additional_context: dict[str, Any] = {
- "exception": exception,
- "exception_html": exception_html,
- "try_number": task_instance.try_number,
- "max_tries": task_instance.max_tries,
- }
+ subject_template_file = conf.get("email", "subject_template", fallback=None)
- # Use the Dag's get_template_env() to set force_sandboxed. Don't add
- # the flag to the function on task object -- that function can be
- # overridden, and adding a flag breaks backward compatibility.
- dag = task_instance.task.get_dag()
- if dag:
- jinja_env = dag.get_template_env(force_sandboxed=True)
+ # Read the template file if configured
+ if subject_template_file and Path(subject_template_file).exists():
+ subject = Path(subject_template_file).read_text()
else:
- jinja_env = SandboxedEnvironment(cache_size=0)
- jinja_context = task_instance.get_template_context()
- if not jinja_context:
- jinja_context = Context()
- # Add additional fields to the context for email template rendering
- jinja_context.update(additional_context) # type: ignore[typeddict-item]
-
- def render(key: str, content: str) -> str:
- if conf.has_option("email", key):
- path = conf.get_mandatory_value("email", key)
- try:
- with open(path) as f:
- content = f.read()
- except FileNotFoundError:
- log.warning("Could not find email template file. Using defaults...", file=path)
- except OSError:
- log.exception("Error while using email template. Using defaults...", file=path)
- return render_template_to_string(jinja_env.from_string(content), jinja_context)
+ # Fallback to default
+ subject = "Airflow alert: {{ti}}"
- subject = render("subject_template", default_subject)
- html_content = render("html_content_template", default_html_content)
- html_content_err = render("html_content_template", default_html_content_err)
+ html_content_template_file = conf.get("email", "html_content_template", fallback=None)
- return subject, html_content, html_content_err
+ # Read the template file if configured
+ if html_content_template_file and Path(html_content_template_file).exists():
+ html_content = Path(html_content_template_file).read_text()
+ else:
+ # Fallback to default
+ # For reporting purposes, we report based on 1-indexed,
+ # not 0-indexed lists (i.e. Try 1 instead of Try 0 for the first attempt).
+ html_content = (
+ "Try {{try_number}} out of {{max_tries + 1}}
"
+ "Exception:
{{exception_html}}
"
+ 'Log: Link
'
+ "Host: {{ti.hostname}}
"
+ 'Mark success: Link
'
+ )
+ # Add exception_html to context for template rendering
+ import html
-def _send_task_error_email(
- to: Iterable[str],
- ti: RuntimeTaskInstance,
- exception: BaseException | str | None,
- log: Logger,
-) -> None:
- from airflow.utils.email import send_email
+ exception_html = html.escape(str(error)).replace("\n", "
")
+ additional_context = {
+ "exception": error,
+ "exception_html": exception_html,
+ "try_number": ti.try_number,
+ "max_tries": ti.max_tries,
+ }
+ email_context = {**context, **additional_context}
+ to_emails = task.email
+ if not to_emails:
+ return
- subject, content, err = _get_email_subject_content(task_instance=ti, exception=exception, log=log)
try:
- send_email(to, subject, content)
+ notifier = SmtpNotifier(
+ to=to_emails,
+ subject=subject,
+ html_content=html_content,
+ from_email=conf.get("email", "from_email", fallback="airflow@airflow"),
+ )
+ notifier(email_context)
except Exception:
- send_email(to, subject, err)
+ log.exception("Failed to send email notification")
def _execute_task(context: Context, ti: RuntimeTaskInstance, log: Logger):
@@ -1457,7 +1436,7 @@ def finalize(
except Exception:
log.exception("error calling listener")
if error and task.email_on_retry and task.email:
- _send_task_error_email(task.email, ti, error, log)
+ _send_error_email_notification(task, ti, context, error, log)
elif state == TaskInstanceState.FAILED:
_run_task_state_change_callbacks(task, "on_failure_callback", context, log)
try:
@@ -1467,7 +1446,7 @@ def finalize(
except Exception:
log.exception("error calling listener")
if error and task.email_on_failure and task.email:
- _send_task_error_email(task.email, ti, error, log)
+ _send_error_email_notification(task, ti, context, error, log)
try:
get_listener_manager().hook.before_stopping(component=TaskRunnerMarker())
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 1615efae917ae..0603e3767338b 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -126,6 +126,7 @@
)
from airflow.sdk.execution_time.xcom import XCom
+from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.mock_operators import AirflowLink
if TYPE_CHECKING:
@@ -2371,6 +2372,173 @@ def mock_send_side_effect(*args, **kwargs):
)
+class TestEmailNotifications:
+ FROM = "from@airflow"
+
+ @pytest.mark.parametrize(
+ "emails, sent",
+ [
+ pytest.param(
+ "test@example.com",
+ True,
+ id="one-email",
+ ),
+ pytest.param(
+ ["test@example.com"],
+ True,
+ id="one-email-as-list",
+ ),
+ pytest.param(
+ ["test@example.com", "test2@example.com"],
+ True,
+ id="multiple-email-as-list",
+ ),
+ pytest.param(None, False, id="no-email"),
+ pytest.param([], False, id="no-email-as-list"),
+ ],
+ )
+ def test_email_on_retry(self, emails, sent, create_runtime_ti, mock_supervisor_comms):
+ """Test email notification on task retry."""
+ from airflow.sdk.execution_time.task_runner import finalize, run
+
+ class ZeroDivsionOperator(BaseOperator):
+ def execute(self, context):
+ 1 // 0
+
+ task = ZeroDivsionOperator(
+ task_id="divide_by_zero_task",
+ email=emails,
+ email_on_retry=True,
+ retries=2,
+ )
+
+ runtime_ti = create_runtime_ti(task=task)
+ context = runtime_ti.get_template_context()
+ log = mock.MagicMock()
+
+ with conf_vars({("email", "from_email"): self.FROM}):
+ with mock.patch("airflow.providers.smtp.notifications.smtp.SmtpNotifier") as mock_smtp_notifier:
+ state, _, error = run(runtime_ti, context, log)
+ finalize(runtime_ti, state, context, log, error)
+
+ if not sent:
+ mock_smtp_notifier.assert_not_called()
+ else:
+ mock_smtp_notifier.assert_called_once()
+ kwargs = mock_smtp_notifier.call_args.kwargs
+ assert kwargs["from_email"] == self.FROM
+ assert kwargs["to"] == emails
+ assert (
+ kwargs["html_content"]
+ == 'Try {{try_number}} out of {{max_tries + 1}}
Exception:
{{exception_html}}
Log: Link
Host: {{ti.hostname}}
Mark success: Link
'
+ )
+
+ @pytest.mark.parametrize(
+ "emails, sent",
+ [
+ pytest.param(
+ "test@example.com",
+ True,
+ id="one-email",
+ ),
+ pytest.param(
+ ["test@example.com"],
+ True,
+ id="one-email-as-list",
+ ),
+ pytest.param(
+ ["test@example.com", "test2@example.com"],
+ True,
+ id="multiple-email-as-list",
+ ),
+ pytest.param(None, False, id="no-email"),
+ pytest.param([], False, id="no-email-as-list"),
+ ],
+ )
+ def test_email_on_failure(self, emails, sent, create_runtime_ti, mock_supervisor_comms):
+ """Test email notification on task failure."""
+ from airflow.exceptions import AirflowFailException
+ from airflow.sdk.execution_time.task_runner import finalize, run
+
+ class FailingOperator(BaseOperator):
+ def execute(self, context):
+ raise AirflowFailException("Task failed on purpose")
+
+ task = FailingOperator(
+ task_id="failing_task",
+ email=emails,
+ email_on_failure=True,
+ )
+
+ runtime_ti = create_runtime_ti(task=task)
+ context = runtime_ti.get_template_context()
+ log = mock.MagicMock()
+
+ with conf_vars({("email", "from_email"): self.FROM}):
+ with mock.patch("airflow.providers.smtp.notifications.smtp.SmtpNotifier") as mock_smtp_notifier:
+ state, _, error = run(runtime_ti, context, log)
+ finalize(runtime_ti, state, context, log, error)
+
+ if not sent:
+ mock_smtp_notifier.assert_not_called()
+ else:
+ mock_smtp_notifier.assert_called_once()
+ kwargs = mock_smtp_notifier.call_args.kwargs
+ assert kwargs["from_email"] == self.FROM
+ assert kwargs["to"] == emails
+ assert (
+ kwargs["html_content"]
+ == 'Try {{try_number}} out of {{max_tries + 1}}
Exception:
{{exception_html}}
Log: Link
Host: {{ti.hostname}}
Mark success: Link
'
+ )
+
+ def test_email_with_custom_templates(self, create_runtime_ti, mock_supervisor_comms, tmp_path):
+ """Test email notification respects custom subject and html_content templates."""
+ from airflow.exceptions import AirflowFailException
+
+ subject_template = tmp_path / "custom_subject.jinja2"
+ html_template = tmp_path / "custom_html.html"
+
+ subject_template.write_text("Custom Subject: Task {{ti.task_id}} Failed\n")
+ html_template.write_text(
+ "
Task: {{ti.task_id}}
Error: {{exception_html}}
" + ) + + class FailingOperator(BaseOperator): + def execute(self, context): + raise AirflowFailException("Task failed for template test") + + task = FailingOperator( + task_id="template_test_task", + email=["test@example.com"], + email_on_failure=True, + ) + + runtime_ti = create_runtime_ti(task=task) + context = runtime_ti.get_template_context() + log = mock.MagicMock() + + with conf_vars( + { + ("email", "subject_template"): str(subject_template), + ("email", "html_content_template"): str(html_template), + ("email", "from_email"): self.FROM, + } + ): + with mock.patch("airflow.providers.smtp.notifications.smtp.SmtpNotifier") as mock_smtp_notifier: + state, _, error = run(runtime_ti, context, log) + finalize(runtime_ti, state, context, log, error) + + mock_smtp_notifier.assert_called_once() + kwargs = mock_smtp_notifier.call_args.kwargs + + assert kwargs["subject"] == "Custom Subject: Task {{ti.task_id}} Failed\n" + assert ( + kwargs["html_content"] + == "Task: {{ti.task_id}}
Error: {{exception_html}}
" + ) + assert kwargs["from_email"] == self.FROM + + class TestDagParamRuntime: DEFAULT_ARGS = { "owner": "test",