diff --git a/airflow-core/docs/administration-and-deployment/dag-bundles.rst b/airflow-core/docs/administration-and-deployment/dag-bundles.rst index a8a962eb8c916..2dcbd1f5b98fa 100644 --- a/airflow-core/docs/administration-and-deployment/dag-bundles.rst +++ b/airflow-core/docs/administration-and-deployment/dag-bundles.rst @@ -122,6 +122,32 @@ Starting Airflow 3.0.2 git is pre installed in the base image. However, if you a ENV GIT_PYTHON_REFRESH=quiet +Using DAG Bundles with User Impersonation +----------------------------------------- + +When using ``run_as_user`` (user impersonation) with DAG bundles, ensure proper file permissions +are configured so that impersonated users can access bundle files created by the main Airflow process. + +1. All impersonated users and the Airflow user should be in the same group +2. Configure appropriate umask settings (e.g., ``umask 0002``) +3. Set :ref:`config:dag_processor__dag_bundle_new_folder_permissions` to ``0o775`` (default) +4. Set :ref:`config:dag_processor__dag_bundle_new_file_permissions` to ``0o664`` (default) + +Example configuration: + +.. code-block:: ini + + [dag_processor] + dag_bundle_new_folder_permissions = 0o775 + dag_bundle_new_file_permissions = 0o664 + +.. note:: + + This permission-based approach is a temporary solution. Future versions of Airflow + will handle multi-user access through supervisor-based bundle operations, eliminating + the need for shared group permissions. + + Writing custom Dag bundles -------------------------- diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index dac144f54cbf0..38e615c8acc63 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -2465,6 +2465,36 @@ dag_processor: example: "/tmp/some-place" default: ~ + dag_bundle_new_folder_permissions: + description: | + Permissions to set on new DAG bundle directories. When using user impersonation + (``run_as_user``), these should be group-writable (e.g., ``0o775``) so that + impersonated users can access the bundle files. + + The value should be an octal string (e.g., ``0o775``). The default allows + owner read/write/execute and group read/write/execute. + + This is similar to ``[logging] file_task_handler_new_folder_permissions``. + version_added: 3.2.0 + type: string + example: "0o775" + default: "0o775" + + dag_bundle_new_file_permissions: + description: | + Permissions to set on new DAG bundle files (lock files, tracking files, etc.). + When using user impersonation (``run_as_user``), these should be group-writable + (e.g., ``0o664``) so that impersonated users can access the files. + + The value should be an octal string (e.g., ``0o664``). The default allows + owner read/write and group read/write. + + This is similar to ``[logging] file_task_handler_new_file_permissions``. + version_added: 3.2.0 + type: string + example: "0o664" + default: "0o664" + dag_bundle_config_list: description: | List of backend configs. Must supply name, classpath, and kwargs for each backend. diff --git a/airflow-core/src/airflow/dag_processing/bundles/base.py b/airflow-core/src/airflow/dag_processing/bundles/base.py index 2df62bcf2b8f8..7993329d88adc 100644 --- a/airflow-core/src/airflow/dag_processing/bundles/base.py +++ b/airflow-core/src/airflow/dag_processing/bundles/base.py @@ -76,6 +76,35 @@ def get_bundle_version_path(bundle_name: str, version: str) -> Path: return base_folder / version +def get_bundle_permissions() -> tuple[int, int]: + """ + Return configured permissions for bundle directories and files. + + When using user impersonation (run_as_user), bundle directories and files + should be group-writable so that impersonated users can access them. + + :return: Tuple of (folder_permissions, file_permissions) as integers + """ + # Fallback needed for backward compatibility with old config files + folder_perms = int(conf.get("dag_processor", "dag_bundle_new_folder_permissions", fallback="0o775"), 8) + file_perms = int(conf.get("dag_processor", "dag_bundle_new_file_permissions", fallback="0o664"), 8) + return folder_perms, file_perms + + +def apply_bundle_permissions(path: Path, is_directory: bool = True) -> None: + """ + Apply configured bundle permissions to a path. + + :param path: The path to apply permissions to + :param is_directory: Whether the path is a directory (True) or file (False) + """ + folder_perms, file_perms = get_bundle_permissions() + try: + path.chmod(folder_perms if is_directory else file_perms) + except OSError: + log.debug("Could not set permissions on %s", path) + + @dataclass(frozen=True) class TrackedBundleVersionInfo: """ @@ -377,9 +406,11 @@ def lock(self): lock_dir_path = get_bundle_storage_root_path() / "_locks" lock_dir_path.mkdir(parents=True, exist_ok=True) + apply_bundle_permissions(lock_dir_path, is_directory=True) lock_file_path = lock_dir_path / f"{self.name}.lock" with open(lock_file_path, "w") as lock_file: + apply_bundle_permissions(lock_file_path, is_directory=False) # Exclusive lock - blocks until it is available fcntl.flock(lock_file, fcntl.LOCK_EX) try: @@ -426,12 +457,17 @@ def _update_version_file(self): if TYPE_CHECKING: assert self.lock_file_path self.lock_file_path.parent.mkdir(parents=True, exist_ok=True) + tracking_root = STALE_BUNDLE_TRACKING_FOLDER + if tracking_root.exists(): + apply_bundle_permissions(tracking_root, is_directory=True) + apply_bundle_permissions(self.lock_file_path.parent, is_directory=True) with tempfile.TemporaryDirectory() as td: temp_file = Path(td, self.lock_file_path) now = pendulum.now(tz=pendulum.UTC) temp_file.write_text(now.isoformat()) os.replace(temp_file, self.lock_file_path) + apply_bundle_permissions(self.lock_file_path, is_directory=False) def acquire(self): if not self.version: diff --git a/airflow-core/tests/unit/dag_processing/bundles/test_base.py b/airflow-core/tests/unit/dag_processing/bundles/test_base.py index 58f7c233337c8..2a34dd59f42bb 100644 --- a/airflow-core/tests/unit/dag_processing/bundles/test_base.py +++ b/airflow-core/tests/unit/dag_processing/bundles/test_base.py @@ -24,6 +24,7 @@ import time from datetime import timedelta from pathlib import Path +from stat import S_IMODE from unittest.mock import patch import pytest @@ -34,6 +35,8 @@ BaseDagBundle, BundleUsageTrackingManager, BundleVersionLock, + apply_bundle_permissions, + get_bundle_permissions, get_bundle_storage_root_path, ) @@ -268,3 +271,110 @@ def test_that_stale_bundles_are_removed( assert len(lock_files) == expected_remaining bundle_folders = list(b.versions_dir.iterdir()) assert len(bundle_folders) == expected_remaining + + +class TestBundlePermissions: + """Tests for bundle permission helper functions.""" + + def test_get_bundle_permissions_default(self): + """Test that default permissions are returned when not configured.""" + with conf_vars( + { + ("dag_processor", "dag_bundle_new_folder_permissions"): None, + ("dag_processor", "dag_bundle_new_file_permissions"): None, + } + ): + folder_perms, file_perms = get_bundle_permissions() + assert folder_perms == 0o775 + assert file_perms == 0o664 + + def test_get_bundle_permissions_custom(self): + """Test that custom permissions are read from config.""" + with conf_vars( + { + ("dag_processor", "dag_bundle_new_folder_permissions"): "0o755", + ("dag_processor", "dag_bundle_new_file_permissions"): "0o644", + } + ): + folder_perms, file_perms = get_bundle_permissions() + assert folder_perms == 0o755 + assert file_perms == 0o644 + + def test_apply_bundle_permissions_directory(self, tmp_path: Path): + """Test that permissions are applied to directories.""" + test_dir = tmp_path / "test_dir" + test_dir.mkdir() + # Set restrictive permissions first + test_dir.chmod(0o700) + + with conf_vars( + { + ("dag_processor", "dag_bundle_new_folder_permissions"): "0o775", + } + ): + apply_bundle_permissions(test_dir, is_directory=True) + mode = S_IMODE(test_dir.stat().st_mode) + assert mode == 0o775 + + def test_apply_bundle_permissions_file(self, tmp_path: Path): + """Test that permissions are applied to files.""" + test_file = tmp_path / "test_file" + test_file.touch() + # Set restrictive permissions first + test_file.chmod(0o600) + + with conf_vars( + { + ("dag_processor", "dag_bundle_new_file_permissions"): "0o664", + } + ): + apply_bundle_permissions(test_file, is_directory=False) + mode = S_IMODE(test_file.stat().st_mode) + assert mode == 0o664 + + def test_lock_applies_permissions(self, bundle_temp_dir): + """Test that lock() applies configured permissions to lock directory and file.""" + with conf_vars( + { + ("dag_processor", "dag_bundle_new_folder_permissions"): "0o775", + ("dag_processor", "dag_bundle_new_file_permissions"): "0o664", + } + ): + bundle = BasicBundle(name="permtest") + lock_dir = get_bundle_storage_root_path() / "_locks" + lock_file = lock_dir / f"{bundle.name}.lock" + + with bundle.lock(): + # Verify lock directory has correct permissions + dir_mode = S_IMODE(lock_dir.stat().st_mode) + assert dir_mode == 0o775 + + # Verify lock file has correct permissions + file_mode = S_IMODE(lock_file.stat().st_mode) + assert file_mode == 0o664 + + def test_bundle_version_lock_applies_permissions(self, bundle_temp_dir): + """Test that BundleVersionLock applies permissions to tracking directories.""" + from airflow.dag_processing.bundles.base import STALE_BUNDLE_TRACKING_FOLDER + + with conf_vars( + { + ("dag_processor", "dag_bundle_new_folder_permissions"): "0o775", + ("dag_processor", "dag_bundle_new_file_permissions"): "0o664", + } + ): + bundle_name = "permtest" + version = "v1" + + with BundleVersionLock(bundle_name=bundle_name, bundle_version=version): + # Verify tracking directory has correct permissions + tracking_dir = STALE_BUNDLE_TRACKING_FOLDER / bundle_name + if tracking_dir.exists(): + dir_mode = S_IMODE(tracking_dir.stat().st_mode) + assert dir_mode == 0o775 + + # Verify tracking file has correct permissions + tracking_file = tracking_dir / version + if tracking_file.exists(): + file_mode = S_IMODE(tracking_file.stat().st_mode) + assert file_mode == 0o664