Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion airflow-core/src/airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
AirflowDagCycleException,
AirflowDagDuplicatedIdException,
AirflowException,
AirflowTaskTimeout,
)
from airflow.listeners.listener import get_listener_manager
from airflow.models.base import Base, StringID
Expand All @@ -64,7 +65,6 @@
)
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.timeout import timeout
from airflow.utils.types import NOTSET

if TYPE_CHECKING:
Expand Down Expand Up @@ -117,6 +117,30 @@ class FileLoadStat(NamedTuple):
warning_num: int


@contextlib.contextmanager
def timeout(seconds=1, error_message="Timeout"):
import logging

log = logging.getLogger(__name__)
error_message = error_message + ", PID: " + str(os.getpid())

def handle_timeout(signum, frame):
"""Log information and raises AirflowTaskTimeout."""
log.error("Process timed out, PID: %s", str(os.getpid()))
raise AirflowTaskTimeout(error_message)

try:
try:
signal.signal(signal.SIGALRM, handle_timeout)
signal.setitimer(signal.ITIMER_REAL, seconds)
except ValueError:
log.warning("timeout can't be used in the current context", exc_info=True)
yield
finally:
with contextlib.suppress(ValueError):
signal.setitimer(signal.ITIMER_REAL, 0)


class DagBag(LoggingMixin):
"""
A dagbag is a collection of dags, parsed out of a folder tree and has high level configuration settings.
Expand Down
3 changes: 3 additions & 0 deletions airflow-core/src/airflow/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
"remove_task_decorator": "airflow.sdk.definitions._internal.decorators.remove_task_decorator",
"fixup_decorator_warning_stack": "airflow.sdk.definitions._internal.decorators.fixup_decorator_warning_stack",
},
"timeout": {
"timeout": "airflow.sdk.execution_time.timeout.timeout",
},
}

add_deprecated_classes(__deprecated_classes, __name__)
88 changes: 0 additions & 88 deletions airflow-core/src/airflow/utils/timeout.py

This file was deleted.

11 changes: 11 additions & 0 deletions airflow-core/tests/unit/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ def setup_class(self):
def teardown_class(self):
db_clean_up()

def test_timeout_context_manager_raises_exception(self):
"""Test that the timeout context manager raises AirflowTaskTimeout when time limit is exceeded."""
import time

from airflow.exceptions import AirflowTaskTimeout
from airflow.models.dagbag import timeout

with pytest.raises(AirflowTaskTimeout):
with timeout(1, "Test timeout"):
time.sleep(2)

def test_get_existing_dag(self, tmp_path):
"""
Test that we're able to parse some example DAGs and retrieve them
Expand Down
Loading