Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion airflow/example_dags/example_xcomargs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import logging

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, get_current_context, task
from airflow.utils.dates import days_ago

Expand All @@ -43,11 +44,26 @@ def print_value(value):
default_args={'owner': 'airflow'},
start_date=days_ago(2),
schedule_interval=None,
tags=['example']
tags=['example'],
) as dag:
task1 = PythonOperator(
task_id='generate_value',
python_callable=generate_value,
)

print_value(task1.output)


with DAG(
"example_xcom_args_with_operators",
default_args={'owner': 'airflow'},
start_date=days_ago(2),
schedule_interval=None,
tags=['example'],
) as dag2:
bash_op1 = BashOperator(task_id="c", bash_command="echo c")
bash_op2 = BashOperator(task_id="d", bash_command="echo c")
xcom_args_a = print_value("first!") # type: ignore
xcom_args_b = print_value("second!") # type: ignore

bash_op1 >> xcom_args_a >> xcom_args_b >> bash_op2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick fix. One other issue is that if we do this at this line:

xcom_args_a >> bash_op1 >>  xcom_args_b

Is this supposed to work? i have not tried yet, but looking at the code, I think BaseOperator.__rshift__ will complain that it can't handle xcom_args. That's a different issue though so the fix can be done separately.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same DAG is used in tests to make sure that it works. BaseOperator._set_relatives handles XComArgs properly. However, XComArgs is missing in type hints due to cyclic imports. I think that once we have your PR merged we may consider something like TaskType that will be union of types that are "task-like"

18 changes: 17 additions & 1 deletion airflow/models/xcom_arg.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,29 @@ def __lshift__(self, other):
Implements XComArg << op
"""
self.set_upstream(other)
return self
return other

def __rshift__(self, other):
"""
Implements XComArg >> op
"""
self.set_downstream(other)
return other

def __rrshift__(self, other):
"""
Called for XComArg >> [XComArg] because list don't have
__rshift__ operators.
"""
self.__lshift__(other)
return self

def __rlshift__(self, other):
"""
Called for XComArg >> [XComArg] because list don't have
__lshift__ operators.
"""
self.__rshift__(other)
return self

def __getitem__(self, item):
Expand Down
18 changes: 10 additions & 8 deletions tests/models/test_xcom_arg.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,29 +78,31 @@ def test_set_downstream(self):
with DAG("test_set_downstream", default_args=DEFAULT_ARGS):
op_a = BashOperator(task_id="a", bash_command="echo a")
op_b = BashOperator(task_id="b", bash_command="echo b")
bash_op = BashOperator(task_id="c", bash_command="echo c")
bash_op1 = BashOperator(task_id="c", bash_command="echo c")
bash_op2 = BashOperator(task_id="d", bash_command="echo c")
xcom_args_a = XComArg(op_a)
xcom_args_b = XComArg(op_b)

xcom_args_a >> xcom_args_b >> bash_op
bash_op1 >> xcom_args_a >> xcom_args_b >> bash_op2

assert len(op_a.downstream_list) == 2
assert op_a in bash_op1.downstream_list
assert op_b in op_a.downstream_list
assert bash_op in op_a.downstream_list
assert bash_op2 in op_b.downstream_list

def test_set_upstream(self):
with DAG("test_set_upstream", default_args=DEFAULT_ARGS):
op_a = BashOperator(task_id="a", bash_command="echo a")
op_b = BashOperator(task_id="b", bash_command="echo b")
bash_op = BashOperator(task_id="c", bash_command="echo c")
bash_op1 = BashOperator(task_id="c", bash_command="echo c")
bash_op2 = BashOperator(task_id="d", bash_command="echo c")
xcom_args_a = XComArg(op_a)
xcom_args_b = XComArg(op_b)

xcom_args_a << xcom_args_b << bash_op
bash_op1 << xcom_args_a << xcom_args_b << bash_op2

assert len(op_a.upstream_list) == 2
assert op_a in bash_op1.upstream_list
assert op_b in op_a.upstream_list
assert bash_op in op_a.upstream_list
assert bash_op2 in op_b.upstream_list

def test_xcom_arg_property_of_base_operator(self):
with DAG("test_xcom_arg_property_of_base_operator", default_args=DEFAULT_ARGS):
Expand Down