diff --git a/providers/git/src/airflow/providers/git/bundles/git.py b/providers/git/src/airflow/providers/git/bundles/git.py index bd8dba4731067..89c7f7af08444 100644 --- a/providers/git/src/airflow/providers/git/bundles/git.py +++ b/providers/git/src/airflow/providers/git/bundles/git.py @@ -17,13 +17,15 @@ from __future__ import annotations import os +import shutil from contextlib import nullcontext from pathlib import Path from urllib.parse import urlparse import structlog from git import Repo -from git.exc import BadName, GitCommandError, NoSuchPathError +from git.exc import BadName, GitCommandError, InvalidGitRepositoryError, NoSuchPathError +from tenacity import retry, retry_if_exception_type, stop_after_attempt from airflow.dag_processing.bundles.base import BaseDagBundle from airflow.exceptions import AirflowException @@ -91,11 +93,21 @@ def _initialize(self): with self.lock(): cm = self.hook.configure_hook_env() if self.hook else nullcontext() with cm: - self._clone_bare_repo_if_required() + try: + self._clone_bare_repo_if_required() + except GitCommandError as e: + raise RuntimeError("Error cloning repository") from e + except InvalidGitRepositoryError as e: + raise RuntimeError(f"Invalid git repository at {self.bare_repo_path}") from e self._ensure_version_in_bare_repo() self.bare_repo.close() - self._clone_repo_if_required() + try: + self._clone_repo_if_required() + except GitCommandError as e: + raise RuntimeError("Error cloning repository") from e + except InvalidGitRepositoryError as e: + raise RuntimeError(f"Invalid git repository at {self.repo_path}") from e self.repo.git.checkout(self.tracking_ref) self._log.debug("bundle initialize", version=self.version) if self.version: @@ -113,36 +125,65 @@ def initialize(self) -> None: self._initialize() super().initialize() + @retry( + retry=retry_if_exception_type((InvalidGitRepositoryError, GitCommandError)), + stop=stop_after_attempt(2), + reraise=True, + ) def _clone_repo_if_required(self) -> None: - if not os.path.exists(self.repo_path): - self._log.info("Cloning repository", repo_path=self.repo_path, bare_repo_path=self.bare_repo_path) - try: + try: + if not os.path.exists(self.repo_path): + self._log.info( + "Cloning repository", repo_path=self.repo_path, bare_repo_path=self.bare_repo_path + ) Repo.clone_from( url=self.bare_repo_path, to_path=self.repo_path, ) - except NoSuchPathError as e: - # Protection should the bare repo be removed manually - raise AirflowException("Repository path: %s not found", self.bare_repo_path) from e - else: - self._log.debug("repo exists", repo_path=self.repo_path) - self.repo = Repo(self.repo_path) - + else: + self._log.debug("repo exists", repo_path=self.repo_path) + self.repo = Repo(self.repo_path) + except NoSuchPathError as e: + # Protection should the bare repo be removed manually + raise AirflowException("Repository path: %s not found", self.bare_repo_path) from e + except (InvalidGitRepositoryError, GitCommandError) as e: + self._log.warning( + "Repository clone/open failed, cleaning up and retrying", + repo_path=self.repo_path, + exc=e, + ) + if os.path.exists(self.repo_path): + shutil.rmtree(self.repo_path) + raise + + @retry( + retry=retry_if_exception_type((InvalidGitRepositoryError, GitCommandError)), + stop=stop_after_attempt(2), + reraise=True, + ) def _clone_bare_repo_if_required(self) -> None: if not self.repo_url: raise AirflowException(f"Connection {self.git_conn_id} doesn't have a host url") - if not os.path.exists(self.bare_repo_path): - self._log.info("Cloning bare repository", bare_repo_path=self.bare_repo_path) - try: + + try: + if not os.path.exists(self.bare_repo_path): + self._log.info("Cloning bare repository", bare_repo_path=self.bare_repo_path) Repo.clone_from( url=self.repo_url, to_path=self.bare_repo_path, bare=True, env=self.hook.env if self.hook else None, ) - except GitCommandError as e: - raise AirflowException("Error cloning repository") from e - self.bare_repo = Repo(self.bare_repo_path) + self.bare_repo = Repo(self.bare_repo_path) + except (InvalidGitRepositoryError, GitCommandError) as e: + self._log.warning( + "Bare repository clone/open failed, cleaning up and retrying", + bare_repo_path=self.bare_repo_path, + exc=e, + ) + if os.path.exists(self.bare_repo_path): + shutil.rmtree(self.bare_repo_path) + raise def _ensure_version_in_bare_repo(self) -> None: if not self.version: diff --git a/providers/git/tests/unit/git/bundles/test_git.py b/providers/git/tests/unit/git/bundles/test_git.py index 950388f6d6c6e..cf85e71758b7f 100644 --- a/providers/git/tests/unit/git/bundles/test_git.py +++ b/providers/git/tests/unit/git/bundles/test_git.py @@ -26,7 +26,7 @@ import pytest from git import Repo -from git.exc import GitCommandError, NoSuchPathError +from git.exc import GitCommandError, InvalidGitRepositoryError, NoSuchPathError from airflow.dag_processing.bundles.base import get_bundle_storage_root_path from airflow.exceptions import AirflowException @@ -653,7 +653,7 @@ def test_clone_bare_repo_git_command_error(self, mock_githook): mock_clone.side_effect = GitCommandError("clone", "Simulated error") bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS, tracking_ref="main") with pytest.raises( - AirflowException, + RuntimeError, match=re.escape("Error cloning repository"), ): bundle.initialize() @@ -745,3 +745,65 @@ def _fake_clone_from(*_, **kwargs): bundle._clone_bare_repo_if_required() _, kwargs = mock_gitRepo.clone_from.call_args assert kwargs["env"] == EXPECTED_ENV + + @mock.patch("airflow.providers.git.bundles.git.GitHook") + @mock.patch("airflow.providers.git.bundles.git.shutil.rmtree") + @mock.patch("airflow.providers.git.bundles.git.os.path.exists") + def test_clone_bare_repo_invalid_repository_error_retry(self, mock_exists, mock_rmtree, mock_githook): + """Test that InvalidGitRepositoryError triggers cleanup and retry.""" + mock_githook.return_value.repo_url = "git@github.com:apache/airflow.git" + mock_githook.return_value.env = {} + + # Set up exists to return True for the bare repo path (simulating corrupted repo exists) + mock_exists.return_value = True + + with mock.patch("airflow.providers.git.bundles.git.Repo") as mock_repo_class: + # First call to Repo() raises InvalidGitRepositoryError, second call succeeds + mock_repo_class.side_effect = [ + InvalidGitRepositoryError("Invalid git repository"), + mock.MagicMock(), # Second attempt succeeds + ] + + # Mock successful clone_from for the retry attempt + mock_repo_class.clone_from = mock.MagicMock() + + bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS, tracking_ref="main") + + # This should not raise an exception due to retry logic + bundle._clone_bare_repo_if_required() + + # Verify cleanup was called + mock_rmtree.assert_called_once_with(bundle.bare_repo_path) + + # Verify Repo was called twice (failed attempt + retry) + assert mock_repo_class.call_count == 2 + + @mock.patch("airflow.providers.git.bundles.git.GitHook") + @mock.patch("airflow.providers.git.bundles.git.shutil.rmtree") + @mock.patch("airflow.providers.git.bundles.git.os.path.exists") + def test_clone_bare_repo_invalid_repository_error_retry_fails( + self, mock_exists, mock_rmtree, mock_githook + ): + """Test that InvalidGitRepositoryError after retry is re-raised (wrapped in AirflowException by caller).""" + mock_githook.return_value.repo_url = "git@github.com:apache/airflow.git" + mock_githook.return_value.env = {} + + # Set up exists to return True for the bare repo path + mock_exists.return_value = True + + with mock.patch("airflow.providers.git.bundles.git.Repo") as mock_repo_class: + # Both calls to Repo() raise InvalidGitRepositoryError + mock_repo_class.side_effect = InvalidGitRepositoryError("Invalid git repository") + + bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS, tracking_ref="main") + + # The raw exception is raised by the method itself, but wrapped by _initialize + with pytest.raises(InvalidGitRepositoryError, match="Invalid git repository"): + bundle._clone_bare_repo_if_required() + + # Verify cleanup was called twice (once for each failed attempt) + assert mock_rmtree.call_count == 2 + mock_rmtree.assert_called_with(bundle.bare_repo_path) + + # Verify Repo was called twice (failed attempt + failed retry) + assert mock_repo_class.call_count == 2