Skip to content

Commit

Permalink
Hadle case when cached venv is used after the operator's upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
moiseenkov committed Apr 18, 2024
1 parent 863c5c3 commit e2eabbb
Showing 1 changed file with 22 additions and 13 deletions.
35 changes: 22 additions & 13 deletions airflow/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,11 +684,12 @@ def __init__(
**kwargs,
)

def _requirements_list(self) -> list[str]:
def _requirements_list(self, exclude_cloudpickle: bool = False) -> list[str]:
"""Prepare a list of requirements that need to be installed for the virtual environment."""
requirements = [str(dependency) for dependency in self.requirements]
if not self.system_site_packages and self.use_cloudpickle and "cloudpickle" not in requirements:
requirements.append("cloudpickle")
if not exclude_cloudpickle:
if not self.system_site_packages and self.use_cloudpickle and "cloudpickle" not in requirements:
requirements.append("cloudpickle")
requirements.sort() # Ensure a hash is stable
return requirements

Expand All @@ -705,7 +706,7 @@ def _prepare_venv(self, venv_path: Path) -> None:
index_urls=self.index_urls,
)

def _calculate_cache_hash(self) -> tuple[str, str]:
def _calculate_cache_hash(self, exclude_cloudpickle: bool = False) -> tuple[str, str]:
"""Generate the hash of the cache folder to use.
The following factors are used as input for the hash:
Expand All @@ -719,7 +720,7 @@ def _calculate_cache_hash(self) -> tuple[str, str]:
Returns a hash and the data dict which is the base for the hash as text.
"""
hash_dict = {
"requirements_list": self._requirements_list(),
"requirements_list": self._requirements_list(exclude_cloudpickle=exclude_cloudpickle),
"pip_install_options": self.pip_install_options,
"index_urls": self.index_urls,
"cache_key": str(Variable.get("PythonVirtualenvOperator.cache_key", "")),
Expand Down Expand Up @@ -750,14 +751,22 @@ def _ensure_venv_cache_exists(self, venv_cache_path: Path) -> Path:
self.log.info("Re-using cached Python virtual environment in %s", venv_path)
return venv_path

self.log.error(
"Unicorn alert: Found a previous virtual environment in %s "
"with the same hash but different parameters. Previous setup: '%s' / "
"Requested venv setup: '%s'. Please report a bug to airflow!",
venv_path,
previous_hash_data,
hash_data,
)
_, hash_data_before_upgrade = self._calculate_cache_hash(exclude_cloudpickle=True)
if previous_hash_data == hash_data_before_upgrade:
self.log.warning(
"Found a previous virtual environment in with outdated dependencies %s, "
"deleting and re-creating.",
venv_path,
)
else:
self.log.error(
"Unicorn alert: Found a previous virtual environment in %s "
"with the same hash but different parameters. Previous setup: '%s' / "
"Requested venv setup: '%s'. Please report a bug to airflow!",
venv_path,
previous_hash_data,
hash_data,
)
else:
self.log.warning(
"Found a previous (probably partial installed) virtual environment in %s, "
Expand Down

0 comments on commit e2eabbb

Please sign in to comment.