Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-5390] Remove provide context #5990

Merged
merged 22 commits into from
Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion airflow/contrib/example_dags/example_qubole_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ def compare_result(**kwargs):

t3 = PythonOperator(
task_id='compare_result',
provide_context=True,
python_callable=compare_result,
trigger_rule="all_done",
dag=dag)
Expand Down
1 change: 0 additions & 1 deletion airflow/example_dags/docker_copy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
#
# t_is_data_available = ShortCircuitOperator(
# task_id='check_if_data_available',
# provide_context=True,
# python_callable=is_data_available,
# dag=dag)
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def should_run(**kwargs):

cond = BranchPythonOperator(
task_id='condition',
provide_context=True,
python_callable=should_run,
dag=dag,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@
)


def my_py_command(**kwargs):
def my_py_command(test_mode, params):
"""
Print out the "foo" param passed in via
`airflow tasks test example_passing_params_via_test_command run_this <date>
-tp '{"foo":"bar"}'`
"""
if kwargs["test_mode"]:
if test_mode:
print(" 'foo' was passed in via test={} command : kwargs[params][foo] \
= {}".format(kwargs["test_mode"], kwargs["params"]["foo"]))
= {}".format(test_mode, params["foo"]))
# Print out the value of "miff", passed in below via the Python Operator
print(" 'miff' was passed in via task params = {}".format(kwargs["params"]["miff"]))
print(" 'miff' was passed in via task params = {}".format(params["miff"]))
return 1


Expand All @@ -58,7 +58,6 @@ def my_py_command(**kwargs):

run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=my_py_command,
params={"miff": "agg"},
dag=dag,
Expand Down
1 change: 0 additions & 1 deletion airflow/example_dags/example_python_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ def print_context(ds, **kwargs):

run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
dag=dag,
)
Expand Down
1 change: 0 additions & 1 deletion airflow/example_dags/example_trigger_target_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ def run_this_func(**kwargs):

run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag,
)
Expand Down
1 change: 0 additions & 1 deletion airflow/example_dags/example_xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
'provide_context': True,
}

dag = DAG('example_xcom', schedule_interval="@once", default_args=args)
Expand Down
39 changes: 19 additions & 20 deletions airflow/operators/python_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import types
from textwrap import dedent
from typing import Optional, Iterable, Dict, Callable
from inspect import signature

import dill

Expand All @@ -51,12 +52,6 @@ class PythonOperator(BaseOperator):
:param op_args: a list of positional arguments that will get unpacked when
calling your callable
:type op_args: list (templated)
:param provide_context: if set to true, Airflow will pass a set of
keyword arguments that can be used in your function. This set of
kwargs correspond exactly to what you can use in your jinja
templates. For this to work, you need to define `**kwargs` in your
function header.
:type provide_context: bool
:param templates_dict: a dictionary where the values are templates that
will get templated by the Airflow engine sometime between
``__init__`` and ``execute`` takes place and are made available
Expand All @@ -79,7 +74,6 @@ def __init__(
python_callable: Callable,
op_args: Optional[Iterable] = None,
op_kwargs: Optional[Dict] = None,
provide_context: bool = False,
templates_dict: Optional[Dict] = None,
templates_exts: Optional[Iterable[str]] = None,
*args,
Expand All @@ -91,7 +85,6 @@ def __init__(
self.python_callable = python_callable
self.op_args = op_args or []
self.op_kwargs = op_kwargs or {}
self.provide_context = provide_context
self.templates_dict = templates_dict
if templates_exts:
self.template_ext = templates_exts
Expand All @@ -104,10 +97,21 @@ def execute(self, context):
for k, v in airflow_context_vars.items()]))
os.environ.update(airflow_context_vars)

if self.provide_context:
context.update(self.op_kwargs)
context['templates_dict'] = self.templates_dict
context.update(self.op_kwargs)
context['templates_dict'] = self.templates_dict

if {parameter for name, parameter
in signature(self.python_callable).parameters.items()
if str(parameter).startswith("**")}:
Fokko marked this conversation as resolved.
Show resolved Hide resolved
# If there is a **kwargs, **context or **_ then just pass everything.
self.op_kwargs = context
else:
# If there is only for example, an execution_date, then pass only these in :-)
self.op_kwargs = {
name: context[name] for name, parameter
in signature(self.python_callable).parameters.items()
if name in context # If it isn't available on the context, then ignore
Fokko marked this conversation as resolved.
Show resolved Hide resolved
}

return_value = self.execute_callable()
self.log.info("Done. Returned value was: %s", return_value)
Expand All @@ -130,6 +134,7 @@ class BranchPythonOperator(PythonOperator, SkipMixin):
downstream to allow for the DAG state to fill up and the DAG run's state
to be inferred.
"""

def execute(self, context):
branch = super().execute(context)
self.skip_all_except(context['ti'], branch)
Expand All @@ -147,6 +152,7 @@ class ShortCircuitOperator(PythonOperator, SkipMixin):

The condition is determined by the result of `python_callable`.
"""

def execute(self, context):
condition = super().execute(context)
self.log.info("Condition result is %s", condition)
Expand Down Expand Up @@ -200,12 +206,6 @@ class PythonVirtualenvOperator(PythonOperator):
:type op_kwargs: list
:param op_kwargs: A dict of keyword arguments to pass to python_callable.
:type op_kwargs: dict
:param provide_context: if set to true, Airflow will pass a set of
keyword arguments that can be used in your function. This set of
kwargs correspond exactly to what you can use in your jinja
templates. For this to work, you need to define `**kwargs` in your
function header.
:type provide_context: bool
:param string_args: Strings that are present in the global var virtualenv_string_args,
available to python_callable at runtime as a list[str]. Note that args are split
by newline.
Expand All @@ -219,6 +219,7 @@ class PythonVirtualenvOperator(PythonOperator):
processing templated fields, for examples ``['.sql', '.hql']``
:type templates_exts: list[str]
"""

@apply_defaults
def __init__(
self,
Expand All @@ -229,7 +230,6 @@ def __init__(
system_site_packages: bool = True,
op_args: Iterable = None,
op_kwargs: Dict = None,
provide_context: bool = False,
string_args: Optional[Iterable[str]] = None,
templates_dict: Optional[Dict] = None,
templates_exts: Optional[Iterable[str]] = None,
Expand All @@ -242,7 +242,6 @@ def __init__(
op_kwargs=op_kwargs,
templates_dict=templates_dict,
templates_exts=templates_exts,
provide_context=provide_context,
*args,
**kwargs)
self.requirements = requirements or []
Expand Down Expand Up @@ -383,7 +382,7 @@ def _generate_python_code(self):
fn = self.python_callable
# dont try to read pickle if we didnt pass anything
if self._pass_op_args():
load_args_line = 'with open(sys.argv[1], "rb") as file: arg_dict = {}.load(file)'\
load_args_line = 'with open(sys.argv[1], "rb") as file: arg_dict = {}.load(file)' \
.format(pickling_library)
else:
load_args_line = 'arg_dict = {"args": [], "kwargs": {}}'
Expand Down
6 changes: 3 additions & 3 deletions docs/howto/operator/python.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ to the Python callable.
Templating
^^^^^^^^^^

When you set the ``provide_context`` argument to ``True``, Airflow passes in
an additional set of keyword arguments: one for each of the :doc:`Jinja
template variables <../../macros-ref>` and a ``templates_dict`` argument.
Airflow passes in an additional set of keyword arguments: one for each of the
:doc:`Jinja template variables <../../macros-ref>` and a ``templates_dict``
argument.

The ``templates_dict`` argument is templated, so each value in the dictionary
is evaluated as a :ref:`Jinja template <jinja-templating>`.
1 change: 0 additions & 1 deletion tests/contrib/operators/test_aws_athena_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ def setUp(self):
args = {
'owner': 'airflow',
'start_date': DEFAULT_DATE,
'provide_context': True
}

self.dag = DAG(TEST_DAG_ID + 'test_schedule_dag_once',
Expand Down
1 change: 0 additions & 1 deletion tests/contrib/operators/test_s3_to_sftp_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ def setUp(self):
args = {
'owner': 'airflow',
'start_date': DEFAULT_DATE,
'provide_context': True
}
dag = DAG(TEST_DAG_ID + 'test_schedule_dag_once', default_args=args)
dag.schedule_interval = '@once'
Expand Down
1 change: 0 additions & 1 deletion tests/contrib/operators/test_sftp_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ def setUp(self):
args = {
'owner': 'airflow',
'start_date': DEFAULT_DATE,
'provide_context': True
}
dag = DAG(TEST_DAG_ID + 'test_schedule_dag_once', default_args=args)
dag.schedule_interval = '@once'
Expand Down
1 change: 0 additions & 1 deletion tests/contrib/operators/test_sftp_to_s3_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def setUp(self):
args = {
'owner': 'airflow',
'start_date': DEFAULT_DATE,
'provide_context': True
}
dag = DAG(TEST_DAG_ID + 'test_schedule_dag_once', default_args=args)
dag.schedule_interval = '@once'
Expand Down
1 change: 0 additions & 1 deletion tests/contrib/operators/test_ssh_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def setUp(self):
args = {
'owner': 'airflow',
'start_date': DEFAULT_DATE,
'provide_context': True
}
dag = DAG(TEST_DAG_ID + 'test_schedule_dag_once', default_args=args)
dag.schedule_interval = '@once'
Expand Down
1 change: 0 additions & 1 deletion tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,6 @@ def test_py_op(templates_dict, ds, **kwargs):

t = PythonOperator(
task_id='test_py_op',
provide_context=True,
python_callable=test_py_op,
templates_dict={'ds': "{{ ds }}"},
dag=self.dag)
Expand Down
1 change: 0 additions & 1 deletion tests/dags/test_cli_triggered_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,5 @@ def success(ti=None, *args, **kwargs):
dag1_task2 = PythonOperator(
task_id='test_run_dependent_task',
python_callable=success,
provide_context=True,
dag=dag1)
dag1_task1.set_downstream(dag1_task2)
15 changes: 7 additions & 8 deletions tests/operators/test_python_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ def build_recording_function(calls_collection):
Then using this custom function recording custom Call objects for further testing
(replacing Mock.assert_called_with assertion method)
"""
def recording_function(*args, **kwargs):
calls_collection.append(Call(*args, **kwargs))
def recording_function(*args):
calls_collection.append(Call(*args))
return recording_function


Expand Down Expand Up @@ -129,11 +129,10 @@ def test_python_operator_python_callable_is_callable(self):
task_id='python_operator',
dag=self.dag)

def _assertCallsEqual(self, first, second):
def _assert_calls_equal(self, first, second):
self.assertIsInstance(first, Call)
self.assertIsInstance(second, Call)
self.assertTupleEqual(first.args, second.args)
self.assertDictEqual(first.kwargs, second.kwargs)

def test_python_callable_arguments_are_templatized(self):
"""Test PythonOperator op_args are templatized"""
Expand All @@ -148,7 +147,7 @@ def test_python_callable_arguments_are_templatized(self):
task_id='python_operator',
# a Mock instance cannot be used as a callable function or test fails with a
# TypeError: Object of type Mock is not JSON serializable
python_callable=(build_recording_function(recorded_calls)),
python_callable=build_recording_function(recorded_calls),
op_args=[
4,
date(2019, 1, 1),
Expand All @@ -167,7 +166,7 @@ def test_python_callable_arguments_are_templatized(self):

ds_templated = DEFAULT_DATE.date().isoformat()
self.assertEqual(1, len(recorded_calls))
self._assertCallsEqual(
self._assert_calls_equal(
recorded_calls[0],
Call(4,
date(2019, 1, 1),
Expand All @@ -183,7 +182,7 @@ def test_python_callable_keyword_arguments_are_templatized(self):
task_id='python_operator',
# a Mock instance cannot be used as a callable function or test fails with a
# TypeError: Object of type Mock is not JSON serializable
python_callable=(build_recording_function(recorded_calls)),
python_callable=build_recording_function(recorded_calls),
op_kwargs={
'an_int': 4,
'a_date': date(2019, 1, 1),
Expand All @@ -200,7 +199,7 @@ def test_python_callable_keyword_arguments_are_templatized(self):
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

self.assertEqual(1, len(recorded_calls))
self._assertCallsEqual(
self._assert_calls_equal(
recorded_calls[0],
Call(an_int=4,
a_date=date(2019, 1, 1),
Expand Down
18 changes: 2 additions & 16 deletions tests/operators/test_virtualenv_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,20 +199,6 @@ def f(_):
self._run_as_operator(f, op_args=[datetime.datetime.utcnow()])

def test_context(self):
def f(**kwargs):
return kwargs['templates_dict']['ds']
def f(templates_dict):
return templates_dict['ds']
self._run_as_operator(f, templates_dict={'ds': '{{ ds }}'})

def test_provide_context(self):
def fn():
pass
task = PythonVirtualenvOperator(
python_callable=fn,
python_version=sys.version_info[0],
task_id='task',
dag=self.dag,
provide_context=True,
)
self.assertTrue(
task.provide_context
)
2 changes: 0 additions & 2 deletions tests/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ def task_callable(ti, **kwargs):
task_id='task_for_testing_file_log_handler',
dag=dag,
python_callable=task_callable,
provide_context=True
)
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)

Expand Down Expand Up @@ -123,7 +122,6 @@ def task_callable(ti, **kwargs):
task_id='task_for_testing_file_log_handler',
dag=dag,
python_callable=task_callable,
provide_context=True
)
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti.try_number = 2
Expand Down