Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace dill package to use cloudpickle #38531

Merged
merged 7 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions INSTALL
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,9 @@ Those extras are available as regular core airflow extras - they install optiona

# START CORE EXTRAS HERE

aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, deprecated-api, github-enterprise,
google-auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, pydantic, rabbitmq, s3fs,
saml, sentry, statsd, uv, virtualenv
aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, cloudpickle, deprecated-api, github-
enterprise, google-auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, pydantic,
rabbitmq, s3fs, saml, sentry, statsd, uv, virtualenv

# END CORE EXTRAS HERE

Expand Down
88 changes: 67 additions & 21 deletions airflow/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Mapping, NamedTuple, Sequence, cast

import dill

from airflow.compat.functools import cache
from airflow.exceptions import (
AirflowConfigException,
Expand All @@ -58,6 +56,15 @@
from airflow.utils.process_utils import execute_in_subprocess
from airflow.utils.python_virtualenv import prepare_virtualenv, write_python_script

log = logging.getLogger(__name__)

if shutil.which("cloudpickle") or importlib.util.find_spec("cloudpickle"):
import cloudpickle as serialization_library
elif shutil.which("dill") or importlib.util.find_spec("dill"):
import dill as serialization_library
else:
log.debug("Neither dill and cloudpickle are installed. Please install one with: pip install [name]")

if TYPE_CHECKING:
from pendulum.datetime import DateTime

Expand Down Expand Up @@ -394,6 +401,7 @@ def __init__(
*,
python_callable: Callable,
use_dill: bool = False,
use_cloudpickle: bool = False,
op_args: Collection[Any] | None = None,
op_kwargs: Mapping[str, Any] | None = None,
string_args: Iterable[str] | None = None,
Expand All @@ -420,8 +428,15 @@ def __init__(
**kwargs,
)
self.string_args = string_args or []
self.use_dill = use_dill
self.pickling_library = dill if self.use_dill else pickle
if use_dill and use_cloudpickle:
raise AirflowException(
"Both 'use_dill' and 'use_cloudpickle' parameters are set to True. Please,"
" choose only one."
)
if use_dill:
use_cloudpickle = use_dill
self.use_cloudpickle = use_cloudpickle
self.pickling_library = serialization_library if self.use_cloudpickle else pickle
self.expect_airflow = expect_airflow
self.skip_on_exit_code = (
skip_on_exit_code
Expand Down Expand Up @@ -555,6 +570,9 @@ class PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
:param use_cloudpickle: Whether to use cloudpickle to serialize
VladaZakharova marked this conversation as resolved.
Show resolved Hide resolved
the args and result (pickle is default). This allows more complex types
but requires you to include cloudpickle in your requirements.
:param system_site_packages: Whether to include
system_site_packages in your virtual environment.
See virtualenv documentation for more information.
Expand Down Expand Up @@ -597,6 +615,7 @@ def __init__(
requirements: None | Iterable[str] | str = None,
python_version: str | None = None,
use_dill: bool = False,
use_cloudpickle: bool = False,
system_site_packages: bool = True,
pip_install_options: list[str] | None = None,
op_args: Collection[Any] | None = None,
Expand Down Expand Up @@ -627,6 +646,13 @@ def __init__(
RemovedInAirflow3Warning,
stacklevel=2,
)
if use_dill and use_cloudpickle:
raise AirflowException(
"Both 'use_dill' and 'use_cloudpickle' parameters are set to True. Please, "
"choose only one."
)
if use_dill:
use_cloudpickle = use_dill
if not is_venv_installed():
raise AirflowException("PythonVirtualenvOperator requires virtualenv, please install it.")
if not requirements:
Expand All @@ -647,7 +673,7 @@ def __init__(
self.venv_cache_path = venv_cache_path
super().__init__(
python_callable=python_callable,
use_dill=use_dill,
use_cloudpickle=use_cloudpickle,
op_args=op_args,
op_kwargs=op_kwargs,
string_args=string_args,
Expand All @@ -658,11 +684,12 @@ def __init__(
**kwargs,
)

def _requirements_list(self) -> list[str]:
def _requirements_list(self, exclude_cloudpickle: bool = False) -> list[str]:
"""Prepare a list of requirements that need to be installed for the virtual environment."""
requirements = [str(dependency) for dependency in self.requirements]
if not self.system_site_packages and self.use_dill and "dill" not in requirements:
requirements.append("dill")
if not exclude_cloudpickle:
if not self.system_site_packages and self.use_cloudpickle and "cloudpickle" not in requirements:
requirements.append("cloudpickle")
requirements.sort() # Ensure a hash is stable
return requirements

Expand All @@ -679,7 +706,7 @@ def _prepare_venv(self, venv_path: Path) -> None:
index_urls=self.index_urls,
)

def _calculate_cache_hash(self) -> tuple[str, str]:
def _calculate_cache_hash(self, exclude_cloudpickle: bool = False) -> tuple[str, str]:
"""Generate the hash of the cache folder to use.

The following factors are used as input for the hash:
Expand All @@ -693,7 +720,7 @@ def _calculate_cache_hash(self) -> tuple[str, str]:
Returns a hash and the data dict which is the base for the hash as text.
"""
hash_dict = {
"requirements_list": self._requirements_list(),
"requirements_list": self._requirements_list(exclude_cloudpickle=exclude_cloudpickle),
"pip_install_options": self.pip_install_options,
"index_urls": self.index_urls,
"cache_key": str(Variable.get("PythonVirtualenvOperator.cache_key", "")),
Expand Down Expand Up @@ -724,14 +751,22 @@ def _ensure_venv_cache_exists(self, venv_cache_path: Path) -> Path:
self.log.info("Re-using cached Python virtual environment in %s", venv_path)
return venv_path

self.log.error(
"Unicorn alert: Found a previous virtual environment in %s "
"with the same hash but different parameters. Previous setup: '%s' / "
"Requested venv setup: '%s'. Please report a bug to airflow!",
venv_path,
previous_hash_data,
hash_data,
)
_, hash_data_before_upgrade = self._calculate_cache_hash(exclude_cloudpickle=True)
if previous_hash_data == hash_data_before_upgrade:
self.log.warning(
"Found a previous virtual environment in with outdated dependencies %s, "
"deleting and re-creating.",
venv_path,
)
else:
self.log.error(
"Unicorn alert: Found a previous virtual environment in %s "
"with the same hash but different parameters. Previous setup: '%s' / "
"Requested venv setup: '%s'. Please report a bug to airflow!",
venv_path,
previous_hash_data,
hash_data,
)
else:
self.log.warning(
"Found a previous (probably partial installed) virtual environment in %s, "
Expand Down Expand Up @@ -820,10 +855,14 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator):
a virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path
(so usually start with "/" or "X:/" depending on the filesystem/os used).
:param python_callable: A python function with no references to outside variables,
defined with def, which will be run in a virtual environment
defined with def, which will be run in a virtual environment.
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but if dill is not preinstalled in your virtual environment, the task will fail with use_dill enabled.
but requires you to include dill in your requirements.
:param use_cloudpickle: Whether to use cloudpickle to serialize
the args and result (pickle is default). This allows more complex types
but if cloudpickle is not preinstalled in your virtual environment, the task will fail
with use_cloudpickle enabled.
:param op_args: A list of positional arguments to pass to python_callable.
:param op_kwargs: A dict of keyword arguments to pass to python_callable.
:param string_args: Strings that are present in the global var virtualenv_string_args,
Expand Down Expand Up @@ -851,6 +890,7 @@ def __init__(
python: str,
python_callable: Callable,
use_dill: bool = False,
use_cloudpickle: bool = False,
op_args: Collection[Any] | None = None,
op_kwargs: Mapping[str, Any] | None = None,
string_args: Iterable[str] | None = None,
Expand All @@ -863,11 +903,17 @@ def __init__(
):
if not python:
raise ValueError("Python Path must be defined in ExternalPythonOperator")
if use_dill and use_cloudpickle:
raise AirflowException(
"Both 'use_dill' and 'use_cloudpickle' parameters are set to True. Please, choose only one."
)
if use_dill:
use_cloudpickle = use_dill
self.python = python
self.expect_pendulum = expect_pendulum
super().__init__(
python_callable=python_callable,
use_dill=use_dill,
use_cloudpickle=use_cloudpickle,
op_args=op_args,
op_kwargs=op_kwargs,
string_args=string_args,
Expand Down
6 changes: 3 additions & 3 deletions contributing-docs/12_airflow_dependencies_and_extras.rst
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ Those extras are available as regular core airflow extras - they install optiona

.. START CORE EXTRAS HERE

aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, deprecated-api, github-enterprise,
google-auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, pydantic, rabbitmq, s3fs,
saml, sentry, statsd, uv, virtualenv
aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, cloudpickle, deprecated-api, github-
enterprise, google-auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, pydantic,
rabbitmq, s3fs, saml, sentry, statsd, uv, virtualenv

.. END CORE EXTRAS HERE

Expand Down
2 changes: 2 additions & 0 deletions docs/apache-airflow/extra-packages-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ python dependencies for the provided package.
+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
| virtualenv | ``pip install 'apache-airflow[virtualenv]'`` | Running python tasks in local virtualenv |
+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
| cloudpickle | pip install apache-airflow[cloudpickle] | Cloudpickle hooks and operators |
+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+


Providers extras
Expand Down
5 changes: 5 additions & 0 deletions docs/apache-airflow/howto/operator/python.rst
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ Use the :class:`~airflow.operators.python.PythonVirtualenvOperator` decorator to
inside a new Python virtual environment. The ``virtualenv`` package needs to be installed in the environment
that runs Airflow (as optional dependency ``pip install apache-airflow[virtualenv] --constraint ...``).

Additionally, the ``cloudpickle`` package needs to be installed as an optional dependency using command
``pip install [cloudpickle] --constraint ...``. This package is a replacement for currently used ``dill`` package.
Cloudpickle offers a strong advantage for its focus on standard pickling protocol, ensuring wider compatibility and
smoother data exchange, while still effectively handling common Python objects and global variables within functions.

.. tip::
The ``@task.virtualenv`` decorator is recommended over the classic ``PythonVirtualenvOperator``
to execute Python callables inside new Python virtual environments.
Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ cloudant
cloudbuild
CloudBuildClient
cloudml
cloudpickle
cloudsqldatabehook
CloudTasksClient
Cloudwatch
Expand Down
3 changes: 3 additions & 0 deletions hatch_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@
# Cgroupspy 0.2.2 added Python 3.10 compatibility
"cgroupspy>=0.2.2",
],
"cloudpickle": [
"cloudpickle",
],
"deprecated-api": [
"requests>=2.27.0,<3",
],
Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ dynamic = ["version", "optional-dependencies", "dependencies"]
#
# START CORE EXTRAS HERE
#
# aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, deprecated-api, github-enterprise,
# google-auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, pydantic, rabbitmq, s3fs,
# saml, sentry, statsd, uv, virtualenv
# aiobotocore, apache-atlas, apache-webhdfs, async, cgroups, cloudpickle, deprecated-api, github-
# enterprise, google-auth, graphviz, kerberos, ldap, leveldb, otel, pandas, password, pydantic,
# rabbitmq, s3fs, saml, sentry, statsd, uv, virtualenv
#
# END CORE EXTRAS HERE
#
Expand Down
Loading