Skip to content
Merged
23 changes: 23 additions & 0 deletions airflow-core/src/airflow/api_fastapi/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,22 @@

from __future__ import annotations

import logging
import traceback
from abc import ABC, abstractmethod
from enum import Enum
from typing import Generic, TypeVar

from fastapi import HTTPException, Request, status
from sqlalchemy.exc import IntegrityError

from airflow.configuration import conf
from airflow.utils.strings import get_random_string

T = TypeVar("T", bound=Exception)

log = logging.getLogger(__name__)


class BaseErrorHandler(Generic[T], ABC):
"""Base class for error handlers."""
Expand Down Expand Up @@ -61,12 +68,28 @@ def __init__(self):
def exception_handler(self, request: Request, exc: IntegrityError):
"""Handle IntegrityError exception."""
if self._is_dialect_matched(exc):
exception_id = get_random_string()
stacktrace = ""
for tb in traceback.format_tb(exc.__traceback__):
stacktrace += tb

log_message = f"Error with id {exception_id}\n{stacktrace}"
log.error(log_message)
if conf.get("api", "expose_stacktrace") == "True":
message = log_message
else:
message = (
"Serious error when handling your request. Check logs for more details - "
f"you will find it in api server when you look for ID {exception_id}"
)

raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
"reason": "Unique constraint violation",
"statement": str(exc.statement),
"orig_error": str(exc.orig),
"message": message,
},
)

Expand Down
6 changes: 6 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1321,6 +1321,12 @@ api:
type: string
example: ~
default: "False"
expose_stacktrace:
description: Expose stacktrace in the web server
version_added: ~
type: string
example: ~
default: "False"
base_url:
description: |
The base url of the API server. Airflow cannot guess what domain or CNAME you are using.
Expand Down
35 changes: 33 additions & 2 deletions airflow-core/tests/unit/api_fastapi/common/test_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# under the License.
from __future__ import annotations

from unittest.mock import patch

import pytest
from fastapi import HTTPException, status
from sqlalchemy.exc import IntegrityError
Expand All @@ -26,6 +28,7 @@
from airflow.utils.session import provide_session
from airflow.utils.state import DagRunState

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_connections, clear_db_dags, clear_db_pools, clear_db_runs

pytestmark = pytest.mark.db_test
Expand All @@ -50,6 +53,11 @@
"reason": f"Test for {_DatabaseDialect.POSTGRES.value} only",
},
]
MOCKED_ID = "TgVcT3QW"
MESSAGE = (
"Serious error when handling your request. Check logs for more details - "
f"you will find it in api server when you look for ID {MOCKED_ID}"
)


def generate_test_cases_parametrize(
Expand Down Expand Up @@ -109,6 +117,7 @@ def teardown_method(self) -> None:
"reason": "Unique constraint violation",
"statement": "INSERT INTO slot_pool (pool, slots, description, include_deferred) VALUES (?, ?, ?, ?)",
"orig_error": "UNIQUE constraint failed: slot_pool.pool",
"message": MESSAGE,
},
),
HTTPException(
Expand All @@ -117,6 +126,7 @@ def teardown_method(self) -> None:
"reason": "Unique constraint violation",
"statement": "INSERT INTO slot_pool (pool, slots, description, include_deferred) VALUES (%s, %s, %s, %s)",
"orig_error": "(1062, \"Duplicate entry 'test_pool' for key 'slot_pool.slot_pool_pool_uq'\")",
"message": MESSAGE,
},
),
HTTPException(
Expand All @@ -125,6 +135,7 @@ def teardown_method(self) -> None:
"reason": "Unique constraint violation",
"statement": "INSERT INTO slot_pool (pool, slots, description, include_deferred) VALUES (%(pool)s, %(slots)s, %(description)s, %(include_deferred)s) RETURNING slot_pool.id",
"orig_error": 'duplicate key value violates unique constraint "slot_pool_pool_uq"\nDETAIL: Key (pool)=(test_pool) already exists.\n',
"message": MESSAGE,
},
),
],
Expand All @@ -135,6 +146,7 @@ def teardown_method(self) -> None:
"reason": "Unique constraint violation",
"statement": 'INSERT INTO variable ("key", val, description, is_encrypted) VALUES (?, ?, ?, ?)',
"orig_error": "UNIQUE constraint failed: variable.key",
"message": MESSAGE,
},
),
HTTPException(
Expand All @@ -143,6 +155,7 @@ def teardown_method(self) -> None:
"reason": "Unique constraint violation",
"statement": "INSERT INTO variable (`key`, val, description, is_encrypted) VALUES (%s, %s, %s, %s)",
"orig_error": "(1062, \"Duplicate entry 'test_key' for key 'variable.variable_key_uq'\")",
"message": MESSAGE,
},
),
HTTPException(
Expand All @@ -151,14 +164,23 @@ def teardown_method(self) -> None:
"reason": "Unique constraint violation",
"statement": "INSERT INTO variable (key, val, description, is_encrypted) VALUES (%(key)s, %(val)s, %(description)s, %(is_encrypted)s) RETURNING variable.id",
"orig_error": 'duplicate key value violates unique constraint "variable_key_uq"\nDETAIL: Key (key)=(test_key) already exists.\n',
"message": MESSAGE,
},
),
],
],
),
)
@patch("airflow.api_fastapi.common.exceptions.get_random_string", return_value=MOCKED_ID)
@conf_vars({("api", "expose_stacktrace"): "False"})
@provide_session
def test_handle_single_column_unique_constraint_error(self, session, table, expected_exception) -> None:
def test_handle_single_column_unique_constraint_error(
self,
mock_get_random_string,
session,
table,
expected_exception,
) -> None:
# Take Pool and Variable tables as test cases
if table == "Pool":
session.add(Pool(pool=TEST_POOL, slots=1, description="test pool", include_deferred=False))
Expand Down Expand Up @@ -188,6 +210,7 @@ def test_handle_single_column_unique_constraint_error(self, session, table, expe
"reason": "Unique constraint violation",
"statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, run_type, triggered_by, conf, data_interval_start, data_interval_end, run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, bundle_version, scheduled_by_job_id, context_carrier, created_dag_version_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, (SELECT max(log_template.id) AS max_1 \nFROM log_template), ?, ?, ?, ?, ?, ?, ?)",
"orig_error": "UNIQUE constraint failed: dag_run.dag_id, dag_run.run_id",
"message": MESSAGE,
},
),
HTTPException(
Expand All @@ -196,6 +219,7 @@ def test_handle_single_column_unique_constraint_error(self, session, table, expe
"reason": "Unique constraint violation",
"statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, run_type, triggered_by, conf, data_interval_start, data_interval_end, run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, bundle_version, scheduled_by_job_id, context_carrier, created_dag_version_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, (SELECT max(log_template.id) AS max_1 \nFROM log_template), %s, %s, %s, %s, %s, %s, %s)",
"orig_error": "(1062, \"Duplicate entry 'test_dag_id-test_run_id' for key 'dag_run.dag_run_dag_id_run_id_key'\")",
"message": MESSAGE,
},
),
HTTPException(
Expand All @@ -204,15 +228,22 @@ def test_handle_single_column_unique_constraint_error(self, session, table, expe
"reason": "Unique constraint violation",
"statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, run_type, triggered_by, conf, data_interval_start, data_interval_end, run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, bundle_version, scheduled_by_job_id, context_carrier, created_dag_version_id) VALUES (%(dag_id)s, %(queued_at)s, %(logical_date)s, %(start_date)s, %(end_date)s, %(state)s, %(run_id)s, %(creating_job_id)s, %(run_type)s, %(triggered_by)s, %(conf)s, %(data_interval_start)s, %(data_interval_end)s, %(run_after)s, %(last_scheduling_decision)s, (SELECT max(log_template.id) AS max_1 \nFROM log_template), %(updated_at)s, %(clear_number)s, %(backfill_id)s, %(bundle_version)s, %(scheduled_by_job_id)s, %(context_carrier)s, %(created_dag_version_id)s) RETURNING dag_run.id",
"orig_error": 'duplicate key value violates unique constraint "dag_run_dag_id_run_id_key"\nDETAIL: Key (dag_id, run_id)=(test_dag_id, test_run_id) already exists.\n',
"message": MESSAGE,
},
),
],
],
),
)
@patch("airflow.api_fastapi.common.exceptions.get_random_string", return_value=MOCKED_ID)
@conf_vars({("api", "expose_stacktrace"): "False"})
@provide_session
def test_handle_multiple_columns_unique_constraint_error(
self, session, table, expected_exception
self,
mock_get_random_string,
session,
table,
expected_exception,
) -> None:
if table == "DagRun":
session.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ def test_post_should_respond_already_exist(self, test_client, body):
assert response.status_code == 409
response_json = response.json()
assert "detail" in response_json
assert list(response_json["detail"].keys()) == ["reason", "statement", "orig_error"]
assert list(response_json["detail"].keys()) == ["reason", "statement", "orig_error", "message"]

@pytest.mark.enable_redact
@pytest.mark.parametrize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,7 @@
from airflow.utils.types import DagRunTriggeredByType, DagRunType

from tests_common.test_utils.api_fastapi import _check_dag_run_note, _check_last_log
from tests_common.test_utils.db import (
clear_db_dags,
clear_db_logs,
clear_db_runs,
clear_db_serialized_dags,
)
from tests_common.test_utils.db import clear_db_dags, clear_db_logs, clear_db_runs, clear_db_serialized_dags
from tests_common.test_utils.format_datetime import from_datetime_to_zulu, from_datetime_to_zulu_without_ms

if TYPE_CHECKING:
Expand Down Expand Up @@ -1577,7 +1572,7 @@ def test_response_409(self, test_client):
assert response.status_code == 409
response_json = response.json()
assert "detail" in response_json
assert list(response_json["detail"].keys()) == ["reason", "statement", "orig_error"]
assert list(response_json["detail"].keys()) == ["reason", "statement", "orig_error", "message"]

@pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_should_respond_200_with_null_logical_date(self, test_client):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def test_should_response_409(
else:
response_json = response.json()
assert "detail" in response_json
assert list(response_json["detail"].keys()) == ["reason", "statement", "orig_error"]
assert list(response_json["detail"].keys()) == ["reason", "statement", "orig_error", "message"]

assert session.query(Pool).count() == n_pools + 1

Expand Down