Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 60 additions & 19 deletions providers/git/src/airflow/providers/git/bundles/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
from __future__ import annotations

import os
import shutil
from contextlib import nullcontext
from pathlib import Path
from urllib.parse import urlparse

import structlog
from git import Repo
from git.exc import BadName, GitCommandError, NoSuchPathError
from git.exc import BadName, GitCommandError, InvalidGitRepositoryError, NoSuchPathError
from tenacity import retry, retry_if_exception_type, stop_after_attempt

from airflow.dag_processing.bundles.base import BaseDagBundle
from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -91,11 +93,21 @@ def _initialize(self):
with self.lock():
cm = self.hook.configure_hook_env() if self.hook else nullcontext()
with cm:
self._clone_bare_repo_if_required()
try:
self._clone_bare_repo_if_required()
except GitCommandError as e:
raise RuntimeError("Error cloning repository") from e
except InvalidGitRepositoryError as e:
raise RuntimeError(f"Invalid git repository at {self.bare_repo_path}") from e
self._ensure_version_in_bare_repo()
self.bare_repo.close()

self._clone_repo_if_required()
try:
self._clone_repo_if_required()
except GitCommandError as e:
raise RuntimeError("Error cloning repository") from e
except InvalidGitRepositoryError as e:
raise RuntimeError(f"Invalid git repository at {self.repo_path}") from e
self.repo.git.checkout(self.tracking_ref)
self._log.debug("bundle initialize", version=self.version)
if self.version:
Expand All @@ -113,36 +125,65 @@ def initialize(self) -> None:
self._initialize()
super().initialize()

@retry(
retry=retry_if_exception_type((InvalidGitRepositoryError, GitCommandError)),
stop=stop_after_attempt(2),
reraise=True,
)
def _clone_repo_if_required(self) -> None:
if not os.path.exists(self.repo_path):
self._log.info("Cloning repository", repo_path=self.repo_path, bare_repo_path=self.bare_repo_path)
try:
try:
if not os.path.exists(self.repo_path):
self._log.info(
"Cloning repository", repo_path=self.repo_path, bare_repo_path=self.bare_repo_path
)
Repo.clone_from(
url=self.bare_repo_path,
to_path=self.repo_path,
)
except NoSuchPathError as e:
# Protection should the bare repo be removed manually
raise AirflowException("Repository path: %s not found", self.bare_repo_path) from e
else:
self._log.debug("repo exists", repo_path=self.repo_path)
self.repo = Repo(self.repo_path)

else:
self._log.debug("repo exists", repo_path=self.repo_path)
self.repo = Repo(self.repo_path)
except NoSuchPathError as e:
# Protection should the bare repo be removed manually
raise AirflowException("Repository path: %s not found", self.bare_repo_path) from e
except (InvalidGitRepositoryError, GitCommandError) as e:
self._log.warning(
"Repository clone/open failed, cleaning up and retrying",
repo_path=self.repo_path,
exc=e,
)
if os.path.exists(self.repo_path):
shutil.rmtree(self.repo_path)
raise

@retry(
retry=retry_if_exception_type((InvalidGitRepositoryError, GitCommandError)),
stop=stop_after_attempt(2),
reraise=True,
)
def _clone_bare_repo_if_required(self) -> None:
if not self.repo_url:
raise AirflowException(f"Connection {self.git_conn_id} doesn't have a host url")
if not os.path.exists(self.bare_repo_path):
self._log.info("Cloning bare repository", bare_repo_path=self.bare_repo_path)
try:

try:
if not os.path.exists(self.bare_repo_path):
self._log.info("Cloning bare repository", bare_repo_path=self.bare_repo_path)
Repo.clone_from(
url=self.repo_url,
to_path=self.bare_repo_path,
bare=True,
env=self.hook.env if self.hook else None,
)
except GitCommandError as e:
raise AirflowException("Error cloning repository") from e
self.bare_repo = Repo(self.bare_repo_path)
self.bare_repo = Repo(self.bare_repo_path)
except (InvalidGitRepositoryError, GitCommandError) as e:
self._log.warning(
"Bare repository clone/open failed, cleaning up and retrying",
bare_repo_path=self.bare_repo_path,
exc=e,
)
if os.path.exists(self.bare_repo_path):
shutil.rmtree(self.bare_repo_path)
raise

def _ensure_version_in_bare_repo(self) -> None:
if not self.version:
Expand Down
66 changes: 64 additions & 2 deletions providers/git/tests/unit/git/bundles/test_git.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import pytest
from git import Repo
from git.exc import GitCommandError, NoSuchPathError
from git.exc import GitCommandError, InvalidGitRepositoryError, NoSuchPathError

from airflow.dag_processing.bundles.base import get_bundle_storage_root_path
from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -653,7 +653,7 @@ def test_clone_bare_repo_git_command_error(self, mock_githook):
mock_clone.side_effect = GitCommandError("clone", "Simulated error")
bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS, tracking_ref="main")
with pytest.raises(
AirflowException,
RuntimeError,
match=re.escape("Error cloning repository"),
):
bundle.initialize()
Expand Down Expand Up @@ -745,3 +745,65 @@ def _fake_clone_from(*_, **kwargs):
bundle._clone_bare_repo_if_required()
_, kwargs = mock_gitRepo.clone_from.call_args
assert kwargs["env"] == EXPECTED_ENV

@mock.patch("airflow.providers.git.bundles.git.GitHook")
@mock.patch("airflow.providers.git.bundles.git.shutil.rmtree")
@mock.patch("airflow.providers.git.bundles.git.os.path.exists")
def test_clone_bare_repo_invalid_repository_error_retry(self, mock_exists, mock_rmtree, mock_githook):
"""Test that InvalidGitRepositoryError triggers cleanup and retry."""
mock_githook.return_value.repo_url = "git@github.com:apache/airflow.git"
mock_githook.return_value.env = {}

# Set up exists to return True for the bare repo path (simulating corrupted repo exists)
mock_exists.return_value = True

with mock.patch("airflow.providers.git.bundles.git.Repo") as mock_repo_class:
# First call to Repo() raises InvalidGitRepositoryError, second call succeeds
mock_repo_class.side_effect = [
InvalidGitRepositoryError("Invalid git repository"),
mock.MagicMock(), # Second attempt succeeds
]

# Mock successful clone_from for the retry attempt
mock_repo_class.clone_from = mock.MagicMock()

bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS, tracking_ref="main")

# This should not raise an exception due to retry logic
bundle._clone_bare_repo_if_required()

# Verify cleanup was called
mock_rmtree.assert_called_once_with(bundle.bare_repo_path)

# Verify Repo was called twice (failed attempt + retry)
assert mock_repo_class.call_count == 2

@mock.patch("airflow.providers.git.bundles.git.GitHook")
@mock.patch("airflow.providers.git.bundles.git.shutil.rmtree")
@mock.patch("airflow.providers.git.bundles.git.os.path.exists")
def test_clone_bare_repo_invalid_repository_error_retry_fails(
self, mock_exists, mock_rmtree, mock_githook
):
"""Test that InvalidGitRepositoryError after retry is re-raised (wrapped in AirflowException by caller)."""
mock_githook.return_value.repo_url = "git@github.com:apache/airflow.git"
mock_githook.return_value.env = {}

# Set up exists to return True for the bare repo path
mock_exists.return_value = True

with mock.patch("airflow.providers.git.bundles.git.Repo") as mock_repo_class:
# Both calls to Repo() raise InvalidGitRepositoryError
mock_repo_class.side_effect = InvalidGitRepositoryError("Invalid git repository")

bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS, tracking_ref="main")

# The raw exception is raised by the method itself, but wrapped by _initialize
with pytest.raises(InvalidGitRepositoryError, match="Invalid git repository"):
bundle._clone_bare_repo_if_required()

# Verify cleanup was called twice (once for each failed attempt)
assert mock_rmtree.call_count == 2
mock_rmtree.assert_called_with(bundle.bare_repo_path)

# Verify Repo was called twice (failed attempt + failed retry)
assert mock_repo_class.call_count == 2
Loading