Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-5390] Remove provide context #5990

Merged
merged 22 commits into from
Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,51 @@ assists users migrating to a new version.
- [Airflow 1.7.1.2](#airflow-1712)

<!-- END doctoc generated TOC please keep comment here to allow auto update -->

## Airflow Master

### Remove provide_context

`provide_context` argument on the PythonOperator was removed. The signature of the callable passed to the PythonOperator is now inferred and argument values are always automatically provided. There is no need to explicitly provide or not provide the context anymore. For example:

```python
def myfunc(execution_date):
print(execution_date)

python_operator = PythonOperator(task_id='mytask', python_callable=myfunc, dag=dag)
```

Notice you don't have to set provide_context=True, variables from the task context are now automatically detected and provided.

All context variables can still be provided with a double-asterisk argument:

```python
def myfunc(**context):
print(context) # all variables will be provided to context

python_operator = PythonOperator(task_id='mytask', python_callable=myfunc)
```

The task context variable names are reserved names in the callable function, hence a clash with `op_args` and `op_kwargs` results in an exception:

```python
def myfunc(dag):
# raises a ValueError because "dag" is a reserved name
# valid signature example: myfunc(mydag)

python_operator = PythonOperator(
task_id='mytask',
op_args=[1],
python_callable=myfunc,
)
```

The change is backwards compatible, setting `provide_context` will add the `provide_context` variable to the `kwargs` (but won't do anything).

PR: [#5990](https://github.com/apache/airflow/pull/5990)

### Changes to FileSensor

FileSensor is now takes a glob pattern, not just a filename. If the filename you are looking for has `*`, `?`, or `[` in it then you should replace these with `[*]`, `[?]`, and `[[]`.

### Change dag loading duration metric name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down Expand Up @@ -75,7 +75,6 @@ def grabArtifactFromJenkins(**context):

artifact_grabber = PythonOperator(
task_id='artifact_grabber',
provide_context=True,
python_callable=grabArtifactFromJenkins,
dag=dag)

Expand Down
1 change: 0 additions & 1 deletion airflow/contrib/example_dags/example_qubole_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ def compare_result(**kwargs):

t3 = PythonOperator(
task_id='compare_result',
provide_context=True,
python_callable=compare_result,
trigger_rule="all_done",
dag=dag)
Expand Down
27 changes: 10 additions & 17 deletions airflow/contrib/sensors/python_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from airflow.operators.python_operator import PythonOperator

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from typing import Optional, Dict, Callable, List


class PythonSensor(BaseSensorOperator):
Expand All @@ -38,12 +40,6 @@ class PythonSensor(BaseSensorOperator):
:param op_args: a list of positional arguments that will get unpacked when
calling your callable
:type op_args: list
:param provide_context: if set to true, Airflow will pass a set of
keyword arguments that can be used in your function. This set of
kwargs correspond exactly to what you can use in your jinja
templates. For this to work, you need to define `**kwargs` in your
function header.
:type provide_context: bool
:param templates_dict: a dictionary where the values are templates that
will get templated by the Airflow engine sometime between
``__init__`` and ``execute`` takes place and are made available
Expand All @@ -56,24 +52,21 @@ class PythonSensor(BaseSensorOperator):
@apply_defaults
def __init__(
self,
python_callable,
op_args=None,
op_kwargs=None,
provide_context=False,
templates_dict=None,
python_callable: Callable,
op_args: Optional[List] = None,
op_kwargs: Optional[Dict] = None,
templates_dict: Optional[Dict] = None,
*args, **kwargs):
super().__init__(*args, **kwargs)
self.python_callable = python_callable
self.op_args = op_args or []
self.op_kwargs = op_kwargs or {}
self.provide_context = provide_context
self.templates_dict = templates_dict

def poke(self, context):
if self.provide_context:
context.update(self.op_kwargs)
context['templates_dict'] = self.templates_dict
self.op_kwargs = context
def poke(self, context: Dict):
context.update(self.op_kwargs)
context['templates_dict'] = self.templates_dict
self.op_kwargs = PythonOperator.determine_op_kwargs(self.python_callable, context, len(self.op_args))

self.log.info("Poking callable: %s", str(self.python_callable))
return_value = self.python_callable(*self.op_args, **self.op_kwargs)
Expand Down
1 change: 0 additions & 1 deletion airflow/example_dags/docker_copy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
#
# t_is_data_available = ShortCircuitOperator(
# task_id='check_if_data_available',
# provide_context=True,
# python_callable=is_data_available,
# dag=dag)
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def should_run(**kwargs):

cond = BranchPythonOperator(
task_id='condition',
provide_context=True,
python_callable=should_run,
dag=dag,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@
)


def my_py_command(**kwargs):
def my_py_command(test_mode, params):
"""
Print out the "foo" param passed in via
`airflow tasks test example_passing_params_via_test_command run_this <date>
-tp '{"foo":"bar"}'`
"""
if kwargs["test_mode"]:
if test_mode:
print(" 'foo' was passed in via test={} command : kwargs[params][foo] \
= {}".format(kwargs["test_mode"], kwargs["params"]["foo"]))
= {}".format(test_mode, params["foo"]))
# Print out the value of "miff", passed in below via the Python Operator
print(" 'miff' was passed in via task params = {}".format(kwargs["params"]["miff"]))
print(" 'miff' was passed in via task params = {}".format(params["miff"]))
return 1


Expand All @@ -58,7 +58,6 @@ def my_py_command(**kwargs):

run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=my_py_command,
params={"miff": "agg"},
dag=dag,
Expand Down
1 change: 0 additions & 1 deletion airflow/example_dags/example_python_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ def print_context(ds, **kwargs):

run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
dag=dag,
)
Expand Down
1 change: 0 additions & 1 deletion airflow/example_dags/example_trigger_target_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ def run_this_func(**kwargs):

run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag,
)
Expand Down
1 change: 0 additions & 1 deletion airflow/example_dags/example_xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
'provide_context': True,
}

dag = DAG('example_xcom', schedule_interval="@once", default_args=args)
Expand Down
1 change: 0 additions & 1 deletion airflow/gcp/utils/mlengine_operator_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ def apply_validate_fn(*args, **kwargs):
evaluate_validation = PythonOperator(
task_id=(task_prefix + "-validation"),
python_callable=apply_validate_fn,
provide_context=True,
templates_dict={"prediction_path": prediction_path},
dag=dag)
evaluate_validation.set_upstream(evaluate_summary)
Expand Down
83 changes: 54 additions & 29 deletions airflow/operators/python_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import subprocess
import sys
import types
from inspect import signature
from itertools import islice
from textwrap import dedent
from typing import Optional, Iterable, Dict, Callable
from typing import Optional, Iterable, Dict, Callable, List

import dill

Expand All @@ -51,12 +53,6 @@ class PythonOperator(BaseOperator):
:param op_args: a list of positional arguments that will get unpacked when
calling your callable
:type op_args: list (templated)
:param provide_context: if set to true, Airflow will pass a set of
keyword arguments that can be used in your function. This set of
kwargs correspond exactly to what you can use in your jinja
templates. For this to work, you need to define `**kwargs` in your
function header.
:type provide_context: bool
:param templates_dict: a dictionary where the values are templates that
will get templated by the Airflow engine sometime between
``__init__`` and ``execute`` takes place and are made available
Expand All @@ -77,11 +73,10 @@ class PythonOperator(BaseOperator):
def __init__(
self,
python_callable: Callable,
op_args: Optional[Iterable] = None,
op_args: Optional[List] = None,
op_kwargs: Optional[Dict] = None,
provide_context: bool = False,
templates_dict: Optional[Dict] = None,
templates_exts: Optional[Iterable[str]] = None,
templates_exts: Optional[List[str]] = None,
*args,
**kwargs
) -> None:
Expand All @@ -91,23 +86,58 @@ def __init__(
self.python_callable = python_callable
self.op_args = op_args or []
self.op_kwargs = op_kwargs or {}
self.provide_context = provide_context
self.templates_dict = templates_dict
if templates_exts:
self.template_ext = templates_exts

def execute(self, context):
@staticmethod
def determine_op_kwargs(python_callable: Callable,
context: Dict,
num_op_args: int = 0) -> Dict:
"""
Function that will inspect the signature of a python_callable to determine which
values need to be passed to the function.

:param python_callable: The function that you want to invoke
:param context: The context provided by the execute method of the Operator/Sensor
:param num_op_args: The number of op_args provided, so we know how many to skip
:return: The op_args dictionary which contains the values that are compatible with the Callable
"""
context_keys = context.keys()
sig = signature(python_callable).parameters.items()
op_args_names = islice(sig, num_op_args)
for name, _ in op_args_names:
# Check if it is part of the context
if name in context_keys:
# Raise an exception to let the user know that the keyword is reserved
raise ValueError(
"The key {} in the op_args is part of the context, and therefore reserved".format(name)
)

if any(str(param).startswith("**") for _, param in sig):
# If there is a ** argument then just dump everything.
op_kwargs = context
else:
# If there is only for example, an execution_date, then pass only these in :-)
op_kwargs = {
name: context[name]
for name, _ in sig
if name in context # If it isn't available on the context, then ignore
}
return op_kwargs

def execute(self, context: Dict):
# Export context to make it available for callables to use.
airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True)
self.log.info("Exporting the following env vars:\n%s",
'\n'.join(["{}={}".format(k, v)
for k, v in airflow_context_vars.items()]))
os.environ.update(airflow_context_vars)

if self.provide_context:
context.update(self.op_kwargs)
context['templates_dict'] = self.templates_dict
self.op_kwargs = context
context.update(self.op_kwargs)
context['templates_dict'] = self.templates_dict

self.op_kwargs = PythonOperator.determine_op_kwargs(self.python_callable, context, len(self.op_args))

return_value = self.execute_callable()
self.log.info("Done. Returned value was: %s", return_value)
Expand All @@ -130,7 +160,8 @@ class BranchPythonOperator(PythonOperator, SkipMixin):
downstream to allow for the DAG state to fill up and the DAG run's state
to be inferred.
"""
def execute(self, context):

def execute(self, context: Dict):
branch = super().execute(context)
self.skip_all_except(context['ti'], branch)

Expand All @@ -147,7 +178,8 @@ class ShortCircuitOperator(PythonOperator, SkipMixin):

The condition is determined by the result of `python_callable`.
"""
def execute(self, context):

def execute(self, context: Dict):
condition = super().execute(context)
self.log.info("Condition result is %s", condition)

Expand Down Expand Up @@ -200,12 +232,6 @@ class PythonVirtualenvOperator(PythonOperator):
:type op_kwargs: list
:param op_kwargs: A dict of keyword arguments to pass to python_callable.
:type op_kwargs: dict
:param provide_context: if set to true, Airflow will pass a set of
keyword arguments that can be used in your function. This set of
kwargs correspond exactly to what you can use in your jinja
templates. For this to work, you need to define `**kwargs` in your
function header.
:type provide_context: bool
:param string_args: Strings that are present in the global var virtualenv_string_args,
available to python_callable at runtime as a list[str]. Note that args are split
by newline.
Expand All @@ -219,6 +245,7 @@ class PythonVirtualenvOperator(PythonOperator):
processing templated fields, for examples ``['.sql', '.hql']``
:type templates_exts: list[str]
"""

@apply_defaults
def __init__(
self,
Expand All @@ -229,7 +256,6 @@ def __init__(
system_site_packages: bool = True,
op_args: Iterable = None,
op_kwargs: Dict = None,
provide_context: bool = False,
string_args: Optional[Iterable[str]] = None,
templates_dict: Optional[Dict] = None,
templates_exts: Optional[Iterable[str]] = None,
Expand All @@ -242,7 +268,6 @@ def __init__(
op_kwargs=op_kwargs,
templates_dict=templates_dict,
templates_exts=templates_exts,
provide_context=provide_context,
*args,
**kwargs)
self.requirements = requirements or []
Expand All @@ -264,8 +289,8 @@ def __init__(
self.__class__.__name__)
# check that args are passed iff python major version matches
if (python_version is not None and
str(python_version)[0] != str(sys.version_info[0]) and
self._pass_op_args()):
str(python_version)[0] != str(sys.version_info[0]) and
self._pass_op_args()):
raise AirflowException("Passing op_args or op_kwargs is not supported across "
"different Python major versions "
"for PythonVirtualenvOperator. "
Expand Down Expand Up @@ -383,7 +408,7 @@ def _generate_python_code(self):
fn = self.python_callable
# dont try to read pickle if we didnt pass anything
if self._pass_op_args():
load_args_line = 'with open(sys.argv[1], "rb") as file: arg_dict = {}.load(file)'\
load_args_line = 'with open(sys.argv[1], "rb") as file: arg_dict = {}.load(file)' \
.format(pickling_library)
else:
load_args_line = 'arg_dict = {"args": [], "kwargs": {}}'
Expand Down
Loading