Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions airflow-core/src/airflow/dag_processing/bundles/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,23 @@ def _extract_and_sign_template(bundle_name: str) -> tuple[str | None, dict]:
for name in self._bundle_config.keys():
if bundle := stored.pop(name, None):
bundle.active = True
new_template, new_params = _extract_and_sign_template(name)
try:
new_template, new_params = _extract_and_sign_template(name)
except Exception as e:
self.log.exception("Error creating bundle '%s': %s", name, e)
continue
if new_template != bundle.signed_url_template:
bundle.signed_url_template = new_template
self.log.debug("Updated URL template for bundle %s", name)
if new_params != bundle.template_params:
bundle.template_params = new_params
self.log.debug("Updated template parameters for bundle %s", name)
else:
new_template, new_params = _extract_and_sign_template(name)
try:
new_template, new_params = _extract_and_sign_template(name)
except Exception as e:
self.log.exception("Error creating bundle '%s': %s", name, e)
continue
new_bundle = DagBundleModel(name=name)
new_bundle.signed_url_template = new_template
new_bundle.template_params = new_params
Expand Down Expand Up @@ -280,7 +288,12 @@ def get_all_dag_bundles(self) -> Iterable[BaseDagBundle]:
:return: list of DAG bundles.
"""
for name, (class_, kwargs) in self._bundle_config.items():
yield class_(name=name, version=None, **kwargs)
try:
yield class_(name=name, version=None, **kwargs)
except Exception as e:
self.log.exception("Error creating bundle '%s': %s", name, e)
# Skip this bundle and continue with others
continue

def view_url(self, name: str, version: str | None = None) -> str | None:
warnings.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,3 +379,50 @@ def test_example_dags_name_is_reserved():
with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(reserved_name_config)}):
with pytest.raises(AirflowConfigException, match="Bundle name 'example_dags' is a reserved name."):
DagBundlesManager().parse_config()


class FailingBundle(BaseDagBundle):
"""Test bundle that raises an exception during initialization."""

def __init__(self, *, should_fail: bool = True, **kwargs):
super().__init__(**kwargs)
if should_fail:
raise ValueError("Bundle creation failed for testing")

def refresh(self):
pass

def get_current_version(self):
return None

@property
def path(self):
return "/tmp/failing"


FAILING_BUNDLE_CONFIG = [
{
"name": "failing-bundle",
"classpath": "unit.dag_processing.bundles.test_dag_bundle_manager.FailingBundle",
"kwargs": {"should_fail": True, "refresh_interval": 1},
}
]


@conf_vars({("core", "LOAD_EXAMPLES"): "False"})
@pytest.mark.db_test
def test_multiple_bundles_one_fails(clear_db, session):
"""Test that when one bundle fails to create, other bundles still load successfully."""
mix_config = BASIC_BUNDLE_CONFIG + FAILING_BUNDLE_CONFIG

with patch.dict(os.environ, {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(mix_config)}):
manager = DagBundlesManager()

bundles = list(manager.get_all_dag_bundles())
assert len(bundles) == 1
assert bundles[0].name == "my-test-bundle"
assert isinstance(bundles[0], BasicBundle)

manager.sync_bundles_to_db()
bundle_names = {b.name for b in session.query(DagBundleModel).all()}
assert bundle_names == {"my-test-bundle"}
5 changes: 3 additions & 2 deletions providers/git/src/airflow/providers/git/bundles/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ def __init__(
self.hook: GitHook | None = None
try:
self.hook = GitHook(git_conn_id=git_conn_id or "git_default", repo_url=self.repo_url)
except Exception as e:
self._log.warning("Could not create GitHook", conn_id=git_conn_id, exc=e)
except Exception:
# re raise so exception propagates immediately with clear error message
raise

if self.hook and self.hook.repo_url:
self.repo_url = self.hook.repo_url
Expand Down
35 changes: 21 additions & 14 deletions providers/git/tests/unit/git/bundles/test_git.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
from airflow.models import Connection
from airflow.providers.git.bundles.git import GitDagBundle
from airflow.providers.git.hooks.git import GitHook
from airflow.sdk.exceptions import ErrorType
from airflow.sdk.execution_time.comms import ErrorResponse

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.version_compat import AIRFLOW_V_3_1_PLUS
Expand Down Expand Up @@ -673,20 +671,29 @@ def test_clone_repo_no_such_path_error(self, mock_githook):

@patch.dict(os.environ, {"AIRFLOW_CONN_MY_TEST_GIT": '{"host": "something", "conn_type": "git"}'})
@pytest.mark.parametrize(
"conn_id, expected_hook_type",
[("my_test_git", GitHook), ("something-else", type(None))],
("conn_id", "expected_hook_type", "exception_expected"),
[
("my_test_git", GitHook, False),
("something-else", None, True),
],
)
def test_repo_url_access_missing_connection_doesnt_error(
self, conn_id, expected_hook_type, mock_supervisor_comms
def test_repo_url_access_missing_connection_raises_exception(
self, conn_id, expected_hook_type, exception_expected
):
if expected_hook_type is type(None):
mock_supervisor_comms.send.return_value = ErrorResponse(error=ErrorType.CONNECTION_NOT_FOUND)
bundle = GitDagBundle(
name="testa",
tracking_ref="main",
git_conn_id=conn_id,
)
assert isinstance(bundle.hook, expected_hook_type)
if exception_expected:
with pytest.raises(Exception, match="The conn_id `something-else` isn't defined"):
GitDagBundle(
name="testa",
tracking_ref="main",
git_conn_id=conn_id,
)
else:
bundle = GitDagBundle(
name="testa",
tracking_ref="main",
git_conn_id=conn_id,
)
assert isinstance(bundle.hook, expected_hook_type)

@mock.patch("airflow.providers.git.bundles.git.GitHook")
def test_lock_used(self, mock_githook, git_repo):
Expand Down
Loading