Skip to content

Commit

Permalink
[AIRFLOW-3123] Use a stack for DAG context management (apache#3956)
Browse files Browse the repository at this point in the history
  • Loading branch information
newtonle authored and wayne.morris committed Jul 29, 2019
1 parent 78b36cb commit 2976e9a
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 10 deletions.
11 changes: 4 additions & 7 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3391,7 +3391,7 @@ def __init__(
self.on_success_callback = on_success_callback
self.on_failure_callback = on_failure_callback

self._context_manager_set = False
self._old_context_manager_dags = []

self._comps = {
'dag_id',
Expand Down Expand Up @@ -3440,16 +3440,13 @@ def __hash__(self):

def __enter__(self):
global _CONTEXT_MANAGER_DAG
if not self._context_manager_set:
self._old_context_manager_dag = _CONTEXT_MANAGER_DAG
_CONTEXT_MANAGER_DAG = self
self._context_manager_set = True
self._old_context_manager_dags.append(_CONTEXT_MANAGER_DAG)
_CONTEXT_MANAGER_DAG = self
return self

def __exit__(self, _type, _value, _tb):
global _CONTEXT_MANAGER_DAG
_CONTEXT_MANAGER_DAG = self._old_context_manager_dag
self._context_manager_set = False
_CONTEXT_MANAGER_DAG = self._old_context_manager_dags.pop()

# /Context Manager ----------------------------------------------

Expand Down
8 changes: 5 additions & 3 deletions tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,13 @@ def test_dag_as_context_manager(self):
with dag:
with dag:
op7 = DummyOperator(task_id='op7')
op8 = DummyOperator(task_id='op8')
op8.dag = dag2
op8 = DummyOperator(task_id='op8')
op9 = DummyOperator(task_id='op8')
op9.dag = dag2

self.assertEqual(op7.dag, dag)
self.assertEqual(op8.dag, dag2)
self.assertEqual(op8.dag, dag)
self.assertEqual(op9.dag, dag2)

def test_dag_topological_sort(self):
dag = DAG(
Expand Down

0 comments on commit 2976e9a

Please sign in to comment.