Skip to content

Commit 29467c3

Browse files
potiukCloud Composer Team
authored and
Cloud Composer Team
committed
Change the storage of frame to use threadLocal rather than Dict (#21993)
There is a very probable WeakKeyDict bug in Python standard library (to be confirmed and investigated further) that manifests itself in a very rare failure of the test_stacktrace_on_failure_starts_with_task_execute_method This turned out to be related to an unexpected behaviour (and most likely a bug - to be confirmed) of WeakKeyDict when you have potentially two different objects with the same `equals` and `hash` values added to the same WeakKeyDict as keys. More info on similar report (but raised for a bit different reason) bug in Python can be found here: https://bugs.python.org/issue44140 We submitted a PR to fix the problem found python/cpython#31685 GitOrigin-RevId: 1949f5d76b5842d56db91c868ae4655bb7a7689f
1 parent 8190aec commit 29467c3

File tree

1 file changed

+35
-12
lines changed

1 file changed

+35
-12
lines changed

airflow/models/taskinstance.py

+35-12
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@
2323
import os
2424
import pickle
2525
import signal
26+
import threading
2627
import warnings
2728
from collections import defaultdict
2829
from datetime import datetime, timedelta
2930
from functools import partial
3031
from inspect import currentframe
3132
from tempfile import NamedTemporaryFile
32-
from types import FrameType
33+
from types import TracebackType
3334
from typing import (
3435
IO,
3536
TYPE_CHECKING,
@@ -45,7 +46,6 @@
4546
Union,
4647
)
4748
from urllib.parse import quote
48-
from weakref import WeakKeyDictionary
4949

5050
import dill
5151
import jinja2
@@ -129,7 +129,7 @@
129129
from airflow.models.dagrun import DagRun
130130
from airflow.models.operator import Operator
131131

132-
_EXECUTION_FRAME_MAPPING: "WeakKeyDictionary[Operator, FrameType]" = WeakKeyDictionary()
132+
_TASK_EXECUTION_FRAME_LOCAL_STORAGE = threading.local()
133133

134134

135135
@contextlib.contextmanager
@@ -1537,7 +1537,7 @@ def _execute_task(self, context, task_copy):
15371537
else:
15381538
result = execute_callable(context=context)
15391539
except: # noqa: E722
1540-
_EXECUTION_FRAME_MAPPING[task_copy] = currentframe()
1540+
_TASK_EXECUTION_FRAME_LOCAL_STORAGE.frame = currentframe()
15411541
raise
15421542
# If the task returns a result, push an XCom containing it
15431543
if task_copy.do_xcom_push and result is not None:
@@ -1731,6 +1731,36 @@ def _handle_reschedule(
17311731
session.commit()
17321732
self.log.info('Rescheduling task, marking task as UP_FOR_RESCHEDULE')
17331733

1734+
def get_truncated_error_traceback(self, error: BaseException) -> Optional[TracebackType]:
1735+
"""
1736+
Returns truncated error traceback.
1737+
1738+
This method returns traceback of the error truncated to the
1739+
frame saved by earlier try/except along the way. If the frame
1740+
is found, the traceback will be truncated to below the frame.
1741+
1742+
:param error: exception to get traceback from
1743+
:return: traceback to print
1744+
"""
1745+
tb = error.__traceback__
1746+
try:
1747+
execution_frame = _TASK_EXECUTION_FRAME_LOCAL_STORAGE.frame
1748+
except AttributeError:
1749+
self.log.warning(
1750+
"We expected to get frame set in local storage but it was not."
1751+
" Please report this as an issue with full logs"
1752+
" at https://github.com/apache/airflow/issues/new",
1753+
exc_info=True,
1754+
)
1755+
return tb
1756+
_TASK_EXECUTION_FRAME_LOCAL_STORAGE.frame = None
1757+
while tb is not None:
1758+
if tb.tb_frame is execution_frame:
1759+
tb = tb.tb_next
1760+
break
1761+
tb = tb.tb_next
1762+
return tb or error.__traceback__
1763+
17341764
@provide_session
17351765
def handle_failure(
17361766
self,
@@ -1746,14 +1776,7 @@ def handle_failure(
17461776

17471777
if error:
17481778
if isinstance(error, BaseException):
1749-
execution_frame = _EXECUTION_FRAME_MAPPING.get(self.task)
1750-
tb = error.__traceback__
1751-
while tb is not None:
1752-
if tb.tb_frame is execution_frame:
1753-
tb = tb.tb_next
1754-
break
1755-
tb = tb.tb_next
1756-
tb = tb or error.__traceback__
1779+
tb = self.get_truncated_error_traceback(error)
17571780
self.log.error("Task failed with exception", exc_info=(type(error), error, tb))
17581781
else:
17591782
self.log.error("%s", error)

0 commit comments

Comments
 (0)