Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor cloudpickle support in Python operators/decorators #39270

Merged
merged 6 commits into from
May 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 48 additions & 13 deletions airflow/decorators/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,15 @@ class TaskDecoratorCollection:
# _PythonVirtualenvDecoratedOperator.
requirements: None | Iterable[str] | str = None,
python_version: None | str | int | float = None,
use_dill: bool = False,
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
system_site_packages: bool = True,
templates_dict: Mapping[str, Any] | None = None,
pip_install_options: list[str] | None = None,
skip_on_exit_code: int | Container[int] | None = None,
index_urls: None | Collection[str] | str = None,
venv_cache_path: None | str = None,
show_return_value_in_logs: bool = True,
use_dill: bool = False,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to convert the decorated callable to a virtual environment task.
Expand All @@ -129,6 +130,13 @@ class TaskDecoratorCollection:
"requirements file" as specified by pip.
:param python_version: The Python version to run the virtual environment with. Note that
both 2 and 2.7 are acceptable forms.
:param serializer: Which serializer use to serialize the args and result. It can be one of the following:

- ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library.
- ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
this requires to include cloudpickle in your requirements.
- ``"dill"``: Use dill for serialize more complex types,
this requires to include dill in your requirements.
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
Expand All @@ -154,6 +162,9 @@ class TaskDecoratorCollection:
logs. Defaults to True, which allows return value log output.
It can be set to False to prevent log output of return value when you return huge data
such as transmission a large amount of XCom to TaskAPI.
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
the args and result (pickle is default). This allows more complex types
but requires you to include dill in your requirements.
"""
@overload
def virtualenv(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
Expand All @@ -164,9 +175,10 @@ class TaskDecoratorCollection:
multiple_outputs: bool | None = None,
# 'python_callable', 'op_args' and 'op_kwargs' since they are filled by
# _PythonVirtualenvDecoratedOperator.
use_dill: bool = False,
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
templates_dict: Mapping[str, Any] | None = None,
show_return_value_in_logs: bool = True,
use_dill: bool = False,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to convert the decorated callable to a virtual environment task.
Expand All @@ -176,9 +188,13 @@ class TaskDecoratorCollection:
(so usually start with "/" or "X:/" depending on the filesystem/os used).
:param multiple_outputs: If set, function return value will be unrolled to multiple XCom values.
Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
:param serializer: Which serializer use to serialize the args and result. It can be one of the following:

- ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library.
- ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
this requires to include cloudpickle in your requirements.
- ``"dill"``: Use dill for serialize more complex types,
this requires to include dill in your requirements.
:param templates_dict: a dictionary where the values are templates that
will get templated by the Airflow engine sometime between
``__init__`` and ``execute`` takes place and are made available
Expand All @@ -187,6 +203,9 @@ class TaskDecoratorCollection:
logs. Defaults to True, which allows return value log output.
It can be set to False to prevent log output of return value when you return huge data
such as transmission a large amount of XCom to TaskAPI.
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
the args and result (pickle is default). This allows more complex types
but requires you to include dill in your requirements.
"""
@overload
def branch( # type: ignore[misc]
Expand All @@ -211,14 +230,15 @@ class TaskDecoratorCollection:
# _PythonVirtualenvDecoratedOperator.
requirements: None | Iterable[str] | str = None,
python_version: None | str | int | float = None,
use_dill: bool = False,
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
system_site_packages: bool = True,
templates_dict: Mapping[str, Any] | None = None,
pip_install_options: list[str] | None = None,
skip_on_exit_code: int | Container[int] | None = None,
index_urls: None | Collection[str] | str = None,
venv_cache_path: None | str = None,
show_return_value_in_logs: bool = True,
use_dill: bool = False,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to wrap the decorated callable into a BranchPythonVirtualenvOperator.
Expand All @@ -232,9 +252,13 @@ class TaskDecoratorCollection:
"requirements file" as specified by pip.
:param python_version: The Python version to run the virtual environment with. Note that
both 2 and 2.7 are acceptable forms.
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
:param serializer: Which serializer use to serialize the args and result. It can be one of the following:

- ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library.
- ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
this requires to include cloudpickle in your requirements.
- ``"dill"``: Use dill for serialize more complex types,
this requires to include dill in your requirements.
:param system_site_packages: Whether to include
system_site_packages in your virtual environment.
See virtualenv documentation for more information.
Expand All @@ -253,6 +277,9 @@ class TaskDecoratorCollection:
logs. Defaults to True, which allows return value log output.
It can be set to False to prevent log output of return value when you return huge data
such as transmission a large amount of XCom to TaskAPI.
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
the args and result (pickle is default). This allows more complex types
but requires you to include dill in your requirements.
"""
@overload
def branch_virtualenv(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
Expand All @@ -264,9 +291,10 @@ class TaskDecoratorCollection:
multiple_outputs: bool | None = None,
# 'python_callable', 'op_args' and 'op_kwargs' since they are filled by
# _PythonVirtualenvDecoratedOperator.
use_dill: bool = False,
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
templates_dict: Mapping[str, Any] | None = None,
show_return_value_in_logs: bool = True,
use_dill: bool = False,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to wrap the decorated callable into a BranchExternalPythonOperator.
Expand All @@ -279,9 +307,13 @@ class TaskDecoratorCollection:
(so usually start with "/" or "X:/" depending on the filesystem/os used).
:param multiple_outputs: If set, function return value will be unrolled to multiple XCom values.
Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
:param serializer: Which serializer use to serialize the args and result. It can be one of the following:

- ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library.
- ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
this requires to include cloudpickle in your requirements.
- ``"dill"``: Use dill for serialize more complex types,
this requires to include dill in your requirements.
:param templates_dict: a dictionary where the values are templates that
will get templated by the Airflow engine sometime between
``__init__`` and ``execute`` takes place and are made available
Expand All @@ -290,6 +322,9 @@ class TaskDecoratorCollection:
logs. Defaults to True, which allows return value log output.
It can be set to False to prevent log output of return value when you return huge data
such as transmission a large amount of XCom to TaskAPI.
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
the args and result (pickle is default). This allows more complex types
but requires you to include dill in your requirements.
"""
@overload
def branch_external_python(
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/tutorial_taskflow_api_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def tutorial_taskflow_api_virtualenv():
"""

@task.virtualenv(
use_dill=True,
serializer="dill", # Use `dill` for advanced serialization.
system_site_packages=False,
requirements=["funcsigs"],
)
Expand Down
Loading