diff --git a/airflow-core/src/airflow/models/dagbag.py b/airflow-core/src/airflow/models/dagbag.py index c1a58f8016161..5845fba1f1e4f 100644 --- a/airflow-core/src/airflow/models/dagbag.py +++ b/airflow-core/src/airflow/models/dagbag.py @@ -47,7 +47,6 @@ AirflowClusterPolicyError, AirflowClusterPolicySkipDag, AirflowClusterPolicyViolation, - AirflowDagCycleException, AirflowDagDuplicatedIdException, AirflowException, ) @@ -67,6 +66,11 @@ from airflow.utils.timeout import timeout from airflow.utils.types import NOTSET +try: + from airflow.sdk.exceptions import AirflowDagCycleException +except ImportError: + from airflow.exceptions import AirflowDagCycleException # type: ignore[no-redef] + if TYPE_CHECKING: from collections.abc import Generator diff --git a/airflow-core/src/airflow/settings.py b/airflow-core/src/airflow/settings.py index 40634852eb6fd..db5ce959b1faf 100644 --- a/airflow-core/src/airflow/settings.py +++ b/airflow-core/src/airflow/settings.py @@ -499,8 +499,7 @@ def prepare_engine_args(disable_connection_pool=False, pool_class=None): # running multiple schedulers, as repeated queries on the same session may read from stale snapshots. # 'READ COMMITTED' is the default value for PostgreSQL. # More information here: - # https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html" - + # https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html if SQL_ALCHEMY_CONN.startswith("mysql"): engine_args["isolation_level"] = "READ COMMITTED" diff --git a/airflow-core/src/airflow/utils/db.py b/airflow-core/src/airflow/utils/db.py index 311198e5f0b95..b91f8e53b584f 100644 --- a/airflow-core/src/airflow/utils/db.py +++ b/airflow-core/src/airflow/utils/db.py @@ -24,8 +24,10 @@ import json import logging import os +import signal import sys import time +import traceback import warnings from collections.abc import Callable, Generator, Iterable, Iterator, Sequence from tempfile import gettempdir @@ -97,6 +99,45 @@ class MappedClassProtocol(Protocol): } +@contextlib.contextmanager +def timeout_with_traceback(seconds, message="Operation timed out"): + """ + Raise a TimeoutException after specified seconds. + + Logs the full call stack when timeout occurs. + + Note: This uses SIGALRM and only works on Unix systems (not Windows). + """ + + class TimeoutException(Exception): + """Exception raised when a timeout occurs.""" + + def timeout_handler(signum, frame): + # Capture the full call stack + stack_trace = "".join(traceback.format_stack(frame)) + + # Log the timeout and stack trace + log.error( + "\n%s after %s seconds\nFull call stack at timeout:\n%s", + message, + seconds, + stack_trace, + ) + + raise TimeoutException(message) + + # Set the signal handler + old_handler = signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(seconds) + + try: + yield + finally: + # Cancel the alarm and restore the old handler + signal.alarm(0) + signal.signal(signal.SIGALRM, old_handler) + + @provide_session def merge_conn(conn: Connection, session: Session = NEW_SESSION): """Add new Connection.""" @@ -572,20 +613,55 @@ def get_default_connections(): def _create_db_from_orm(session): + """Create database tables from ORM models and stamp alembic version.""" log.info("Creating Airflow database tables from the ORM") from alembic import command from airflow.models.base import Base + # Debug setup if requested + _setup_debug_logging_if_needed() + + log.info("Creating context") with create_global_lock(session=session, lock=DBLocks.MIGRATIONS): + log.info("Binding engine") engine = session.get_bind().engine + log.info("Pool status: %s", engine.pool.status()) + log.info("Creating metadata") Base.metadata.create_all(engine) - # stamp the migration head + + # Stamp the migration head + log.info("Getting alembic config") config = _get_alembic_config() command.stamp(config, "head") log.info("Airflow database tables created") +def _setup_debug_logging_if_needed(): + """Set up debug logging and stack trace dumping if SQLALCHEMY_ENGINE_DEBUG is set.""" + if not os.environ.get("SQLALCHEMY_ENGINE_DEBUG"): + return + + import faulthandler + import threading + + # Enable SQLA debug logging + logging.getLogger("sqlalchemy.engine").setLevel(logging.DEBUG) + + # Enable Fault Handler + faulthandler.enable(file=sys.stderr, all_threads=True) + + # Print Active Threads and Stack Traces Periodically + def dump_stacks(): + while True: + for thread_id, frame in sys._current_frames().items(): + log.info("\nThread %s stack:", thread_id) + traceback.print_stack(frame) + time.sleep(300) + + threading.Thread(target=dump_stacks, daemon=True).start() + + @provide_session def initdb(session: Session = NEW_SESSION): """Initialize Airflow database.""" @@ -596,10 +672,11 @@ def initdb(session: Session = NEW_SESSION): import_all_models() db_exists = _get_current_revision(session) - if db_exists: - upgradedb(session=session) - else: - _create_db_from_orm(session=session) + with timeout_with_traceback(60 * 20, "DB upgrade/creation timed out."): + if db_exists: + upgradedb(session=session) + else: + _create_db_from_orm(session=session) external_db_manager.initdb(session) # Add default pool & sync log_template diff --git a/airflow-core/tests/unit/models/test_base.py b/airflow-core/tests/unit/models/test_base.py index 5d9b6820c9d4b..b41fb5d0cc2a7 100644 --- a/airflow-core/tests/unit/models/test_base.py +++ b/airflow-core/tests/unit/models/test_base.py @@ -30,7 +30,9 @@ [ pytest.param("postgresql://host/the_database", {}, {}, id="postgres"), pytest.param("mysql://host/the_database", {"collation": "utf8mb3_bin"}, {}, id="mysql"), - pytest.param("mysql+pymsql://host/the_database", {"collation": "utf8mb3_bin"}, {}, id="mysql+pymsql"), + pytest.param( + "mysql+pymysql://host/the_database", {"collation": "utf8mb3_bin"}, {}, id="mysql+pymysql" + ), pytest.param( "mysql://host/the_database", {"collation": "ascii"}, diff --git a/providers/mysql/src/airflow/providers/mysql/hooks/mysql.py b/providers/mysql/src/airflow/providers/mysql/hooks/mysql.py index 7fa2d6434194c..affa6edd7471e 100644 --- a/providers/mysql/src/airflow/providers/mysql/hooks/mysql.py +++ b/providers/mysql/src/airflow/providers/mysql/hooks/mysql.py @@ -378,6 +378,8 @@ def get_uri(self) -> str: # Determine URI prefix based on client if client_name == "mysql-connector-python": uri_prefix = "mysql+mysqlconnector://" + elif client_name == "pymysql": + uri_prefix = "mysql+pymysql://" else: # default: mysqlclient uri_prefix = "mysql://" diff --git a/providers/mysql/tests/unit/mysql/hooks/test_mysql.py b/providers/mysql/tests/unit/mysql/hooks/test_mysql.py index ccab88b87a74b..ea011f3decdca 100644 --- a/providers/mysql/tests/unit/mysql/hooks/test_mysql.py +++ b/providers/mysql/tests/unit/mysql/hooks/test_mysql.py @@ -36,7 +36,11 @@ MYSQL_AVAILABLE = False from airflow.providers.mysql.hooks.mysql import MySqlHook -from airflow.utils import timezone + +try: + from airflow.sdk import timezone +except ImportError: + from airflow.utils import timezone # type: ignore[attr-defined,no-redef] from tests_common.test_utils.asserts import assert_equal_ignore_multiple_spaces @@ -125,6 +129,18 @@ def test_dummy_connection_setter(self, mock_connect): "mysql+mysqlconnector://user%40domain:password@host/schema", id="mysql_connector_python", ), + pytest.param( + { + "login": "user@domain", + "password": "password", + "host": "host", + "schema": "schema", + "port": None, + "extra": json.dumps({"client": "pymysql"}), + }, + "mysql+pymysql://user%40domain:password@host/schema", + id="mysql_connector_pymysql", + ), pytest.param( { "login": "user@domain", diff --git a/scripts/ci/docker-compose/backend-mysql.yml b/scripts/ci/docker-compose/backend-mysql.yml index f857cda9bfc7a..4295ada9dc00e 100644 --- a/scripts/ci/docker-compose/backend-mysql.yml +++ b/scripts/ci/docker-compose/backend-mysql.yml @@ -30,16 +30,20 @@ services: - MYSQL_ALLOW_EMPTY_PASSWORD=true - MYSQL_ROOT_HOST=% - MYSQL_DATABASE=airflow + - MYSQL_INITDB_SKIP_TZINFO=1 volumes: - ../mysql/conf.d:/etc/mysql/conf.d:ro - mysql-db-volume:/var/lib/mysql healthcheck: - test: ["CMD", "mysql", "-h", "localhost", "-P", "3306", "-u", "root", "-e", "SELECT 1"] + test: ["CMD", "mysqladmin", "status", "-h", "localhost", "-u", "root"] interval: 10s timeout: 10s retries: 5 restart: "on-failure" - command: ['mysqld', '--character-set-server=utf8mb4', - '--collation-server=utf8mb4_unicode_ci'] + command: [ + 'mysqld', + '--character-set-server=utf8mb4', + '--collation-server=utf8mb4_unicode_ci', + ] volumes: mysql-db-volume: diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index b7a2693faf294..d1ef014a8185b 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -44,7 +44,6 @@ from airflow import settings from airflow.exceptions import ( - AirflowDagCycleException, DuplicateTaskIdFound, FailFastDagInvalidTriggerRule, ParamValidationError, @@ -58,6 +57,7 @@ from airflow.sdk.definitions.context import Context from airflow.sdk.definitions.deadline import DeadlineAlert from airflow.sdk.definitions.param import DagParam, ParamsDict +from airflow.sdk.exceptions import AirflowDagCycleException from airflow.timetables.base import Timetable from airflow.timetables.simple import ( AssetTriggeredTimetable, diff --git a/task-sdk/src/airflow/sdk/definitions/taskgroup.py b/task-sdk/src/airflow/sdk/definitions/taskgroup.py index ed79ae4202ead..79bddbc5edc24 100644 --- a/task-sdk/src/airflow/sdk/definitions/taskgroup.py +++ b/task-sdk/src/airflow/sdk/definitions/taskgroup.py @@ -34,12 +34,12 @@ from airflow.configuration import conf from airflow.exceptions import ( - AirflowDagCycleException, AirflowException, DuplicateTaskIdFound, TaskAlreadyInTaskGroup, ) from airflow.sdk.definitions._internal.node import DAGNode, validate_group_key +from airflow.sdk.exceptions import AirflowDagCycleException from airflow.utils.trigger_rule import TriggerRule if TYPE_CHECKING: