Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion airflow-core/src/airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
AirflowClusterPolicyError,
AirflowClusterPolicySkipDag,
AirflowClusterPolicyViolation,
AirflowDagCycleException,
AirflowDagDuplicatedIdException,
AirflowException,
)
Expand All @@ -67,6 +66,11 @@
from airflow.utils.timeout import timeout
from airflow.utils.types import NOTSET

try:
from airflow.sdk.exceptions import AirflowDagCycleException
except ImportError:
from airflow.exceptions import AirflowDagCycleException # type: ignore[no-redef]

if TYPE_CHECKING:
from collections.abc import Generator

Expand Down
3 changes: 1 addition & 2 deletions airflow-core/src/airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,7 @@ def prepare_engine_args(disable_connection_pool=False, pool_class=None):
# running multiple schedulers, as repeated queries on the same session may read from stale snapshots.
# 'READ COMMITTED' is the default value for PostgreSQL.
# More information here:
# https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html"

# https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html
if SQL_ALCHEMY_CONN.startswith("mysql"):
engine_args["isolation_level"] = "READ COMMITTED"

Expand Down
87 changes: 82 additions & 5 deletions airflow-core/src/airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import json
import logging
import os
import signal
import sys
import time
import traceback
import warnings
from collections.abc import Callable, Generator, Iterable, Iterator, Sequence
from tempfile import gettempdir
Expand Down Expand Up @@ -97,6 +99,45 @@ class MappedClassProtocol(Protocol):
}


@contextlib.contextmanager
def timeout_with_traceback(seconds, message="Operation timed out"):
"""
Raise a TimeoutException after specified seconds.

Logs the full call stack when timeout occurs.

Note: This uses SIGALRM and only works on Unix systems (not Windows).
"""

class TimeoutException(Exception):
"""Exception raised when a timeout occurs."""

def timeout_handler(signum, frame):
# Capture the full call stack
stack_trace = "".join(traceback.format_stack(frame))

# Log the timeout and stack trace
log.error(
"\n%s after %s seconds\nFull call stack at timeout:\n%s",
message,
seconds,
stack_trace,
)

raise TimeoutException(message)

# Set the signal handler
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(seconds)

try:
yield
finally:
# Cancel the alarm and restore the old handler
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)


@provide_session
def merge_conn(conn: Connection, session: Session = NEW_SESSION):
"""Add new Connection."""
Expand Down Expand Up @@ -572,20 +613,55 @@ def get_default_connections():


def _create_db_from_orm(session):
"""Create database tables from ORM models and stamp alembic version."""
log.info("Creating Airflow database tables from the ORM")
from alembic import command

from airflow.models.base import Base

# Debug setup if requested
_setup_debug_logging_if_needed()

log.info("Creating context")
with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
log.info("Binding engine")
engine = session.get_bind().engine
log.info("Pool status: %s", engine.pool.status())
log.info("Creating metadata")
Base.metadata.create_all(engine)
# stamp the migration head

# Stamp the migration head
log.info("Getting alembic config")
config = _get_alembic_config()
command.stamp(config, "head")
log.info("Airflow database tables created")


def _setup_debug_logging_if_needed():
"""Set up debug logging and stack trace dumping if SQLALCHEMY_ENGINE_DEBUG is set."""
if not os.environ.get("SQLALCHEMY_ENGINE_DEBUG"):
return

import faulthandler
import threading

# Enable SQLA debug logging
logging.getLogger("sqlalchemy.engine").setLevel(logging.DEBUG)

# Enable Fault Handler
faulthandler.enable(file=sys.stderr, all_threads=True)

# Print Active Threads and Stack Traces Periodically
def dump_stacks():
while True:
for thread_id, frame in sys._current_frames().items():
log.info("\nThread %s stack:", thread_id)
traceback.print_stack(frame)
time.sleep(300)

threading.Thread(target=dump_stacks, daemon=True).start()


@provide_session
def initdb(session: Session = NEW_SESSION):
"""Initialize Airflow database."""
Expand All @@ -596,10 +672,11 @@ def initdb(session: Session = NEW_SESSION):
import_all_models()

db_exists = _get_current_revision(session)
if db_exists:
upgradedb(session=session)
else:
_create_db_from_orm(session=session)
with timeout_with_traceback(60 * 20, "DB upgrade/creation timed out."):
if db_exists:
upgradedb(session=session)
else:
_create_db_from_orm(session=session)

external_db_manager.initdb(session)
# Add default pool & sync log_template
Expand Down
4 changes: 3 additions & 1 deletion airflow-core/tests/unit/models/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
[
pytest.param("postgresql://host/the_database", {}, {}, id="postgres"),
pytest.param("mysql://host/the_database", {"collation": "utf8mb3_bin"}, {}, id="mysql"),
pytest.param("mysql+pymsql://host/the_database", {"collation": "utf8mb3_bin"}, {}, id="mysql+pymsql"),
pytest.param(
"mysql+pymysql://host/the_database", {"collation": "utf8mb3_bin"}, {}, id="mysql+pymysql"
),
pytest.param(
"mysql://host/the_database",
{"collation": "ascii"},
Expand Down
2 changes: 2 additions & 0 deletions providers/mysql/src/airflow/providers/mysql/hooks/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,8 @@ def get_uri(self) -> str:
# Determine URI prefix based on client
if client_name == "mysql-connector-python":
uri_prefix = "mysql+mysqlconnector://"
elif client_name == "pymysql":
uri_prefix = "mysql+pymysql://"
else: # default: mysqlclient
uri_prefix = "mysql://"

Expand Down
18 changes: 17 additions & 1 deletion providers/mysql/tests/unit/mysql/hooks/test_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@
MYSQL_AVAILABLE = False

from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.utils import timezone

try:
from airflow.sdk import timezone
except ImportError:
from airflow.utils import timezone # type: ignore[attr-defined,no-redef]

from tests_common.test_utils.asserts import assert_equal_ignore_multiple_spaces

Expand Down Expand Up @@ -125,6 +129,18 @@ def test_dummy_connection_setter(self, mock_connect):
"mysql+mysqlconnector://user%40domain:password@host/schema",
id="mysql_connector_python",
),
pytest.param(
{
"login": "user@domain",
"password": "password",
"host": "host",
"schema": "schema",
"port": None,
"extra": json.dumps({"client": "pymysql"}),
},
"mysql+pymysql://user%40domain:password@host/schema",
id="mysql_connector_pymysql",
),
pytest.param(
{
"login": "user@domain",
Expand Down
10 changes: 7 additions & 3 deletions scripts/ci/docker-compose/backend-mysql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,20 @@ services:
- MYSQL_ALLOW_EMPTY_PASSWORD=true
- MYSQL_ROOT_HOST=%
- MYSQL_DATABASE=airflow
- MYSQL_INITDB_SKIP_TZINFO=1
volumes:
- ../mysql/conf.d:/etc/mysql/conf.d:ro
- mysql-db-volume:/var/lib/mysql
healthcheck:
test: ["CMD", "mysql", "-h", "localhost", "-P", "3306", "-u", "root", "-e", "SELECT 1"]
test: ["CMD", "mysqladmin", "status", "-h", "localhost", "-u", "root"]
interval: 10s
timeout: 10s
retries: 5
restart: "on-failure"
command: ['mysqld', '--character-set-server=utf8mb4',
'--collation-server=utf8mb4_unicode_ci']
command: [
'mysqld',
'--character-set-server=utf8mb4',
'--collation-server=utf8mb4_unicode_ci',
]
volumes:
mysql-db-volume:
2 changes: 1 addition & 1 deletion task-sdk/src/airflow/sdk/definitions/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@

from airflow import settings
from airflow.exceptions import (
AirflowDagCycleException,
DuplicateTaskIdFound,
FailFastDagInvalidTriggerRule,
ParamValidationError,
Expand All @@ -58,6 +57,7 @@
from airflow.sdk.definitions.context import Context
from airflow.sdk.definitions.deadline import DeadlineAlert
from airflow.sdk.definitions.param import DagParam, ParamsDict
from airflow.sdk.exceptions import AirflowDagCycleException
from airflow.timetables.base import Timetable
from airflow.timetables.simple import (
AssetTriggeredTimetable,
Expand Down
2 changes: 1 addition & 1 deletion task-sdk/src/airflow/sdk/definitions/taskgroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@

from airflow.configuration import conf
from airflow.exceptions import (
AirflowDagCycleException,
AirflowException,
DuplicateTaskIdFound,
TaskAlreadyInTaskGroup,
)
from airflow.sdk.definitions._internal.node import DAGNode, validate_group_key
from airflow.sdk.exceptions import AirflowDagCycleException
from airflow.utils.trigger_rule import TriggerRule

if TYPE_CHECKING:
Expand Down
Loading