diff --git a/osbs/utils/__init__.py b/osbs/utils/__init__.py index 86749b28..a75773ea 100644 --- a/osbs/utils/__init__.py +++ b/osbs/utils/__init__.py @@ -12,6 +12,7 @@ import logging import os import os.path +from pathlib import Path import random import re import shutil @@ -69,6 +70,50 @@ def __repr__(self): return self.uri +def _enforce_sandbox(repo_root: str, *, remove_unsafe_symlinks: bool) -> None: + """ + Check that there are no symlinks that try to leave the cloned repository. + + :param (str | Path) repo_root: absolute path to root of cloned repository + :raises OsbsValidationException: if any symlink points outside of cloned repository + """ + for dirpath, subdirs, files in os.walk(repo_root): + dirpath = Path(dirpath) + + for entry in subdirs + files: + # the logic in here actually *requires* f-strings with `!r`. using + # `%r` DOES NOT WORK (tested) + # pylint: disable=logging-fstring-interpolation + + # apparently pylint doesn't understand Path + full_path = dirpath / entry # pylint: disable=old-division + + try: + real_path = full_path.resolve() + except RuntimeError as e: + if "Symlink loop from " in str(e): + logger.info(f"Symlink loop from {full_path!r}") + continue + logger.exception("RuntimeError encountered") + raise + + try: + real_path.relative_to(repo_root) + except ValueError: + # Unlike the real path, the full path is always relative to the root + relative_path = str(full_path.relative_to(repo_root)) + if remove_unsafe_symlinks: + full_path.unlink() + logger.warning( + f"The destination of {relative_path!r} is outside of cloned repository. " + "Removing...", + ) + else: + raise OsbsValidationException( + f"The destination of {relative_path!r} is outside of cloned repository", + ) + + @contextlib.contextmanager def checkout_git_repo(git_url, target_dir=None, commit=None, retry_times=GIT_MAX_RETRIES, branch=None, depth=None): diff --git a/tests/utils/test_utils.py b/tests/utils/test_utils.py index 833c91cc..8ab0bcf0 100644 --- a/tests/utils/test_utils.py +++ b/tests/utils/test_utils.py @@ -6,6 +6,8 @@ of the BSD license. See the LICENSE file for details. """ from __future__ import absolute_import +from typing import Union +from unittest import mock from flexmock import flexmock import os @@ -18,14 +20,17 @@ import logging from time import sleep import time +from pathlib import Path from textwrap import dedent from osbs.constants import REPO_CONTAINER_CONFIG, USER_WARNING_LEVEL from osbs.repo_utils import RepoInfo -from osbs.utils import (git_repo_humanish_part_from_uri, sanitize_strings_for_openshift, - make_name_from_git, get_instance_token_file_name, clone_git_repo, - get_repo_info, UserWarningsStore, ImageName, reset_git_repo) -from osbs.exceptions import OsbsException, OsbsCommitNotFound, OsbsLocallyModified +from osbs.utils import (_enforce_sandbox, git_repo_humanish_part_from_uri, + sanitize_strings_for_openshift, make_name_from_git, + get_instance_token_file_name, clone_git_repo, + get_repo_info, UserWarningsStore, ImageName, reset_git_repo,) +from osbs.exceptions import (OsbsException, OsbsCommitNotFound, OsbsLocallyModified, + OsbsValidationException,) from tests.constants import (TEST_DOCKERFILE_GIT, TEST_DOCKERFILE_SHA1, TEST_DOCKERFILE_INIT_SHA1, TEST_DOCKERFILE_BRANCH) import osbs.kerberos_ccache @@ -491,3 +496,112 @@ def test_store_user_warnings(logs, expected, wrong_input, caplog): user_warnings = str(user_warnings).splitlines() assert sorted(user_warnings) == sorted(expected) + + +class Symlink(str): + """ + Use this to create symlinks via write_file_tree(). + + The value of a Symlink instance is the target path (path to make a symlink to). + """ + + +def write_file_tree(tree_def: dict, rooted_at: Union[str, Path], *, exist_dirs_ok: bool = False): + """ + Write a file tree to disk. + + :param tree_def: Definition of file tree, see usage for intuitive examples + :param rooted_at: Root of file tree, must be an existing directory + :param exist_dirs_ok: If True, existing directories will not cause this function to fail + """ + root = Path(rooted_at) + for entry, value in tree_def.items(): + entry_path = root / entry + if isinstance(value, Symlink): + os.symlink(value, entry_path) + elif isinstance(value, str): + entry_path.write_text(value) + else: + entry_path.mkdir(exist_ok=exist_dirs_ok) + write_file_tree(value, entry_path) + + +@pytest.mark.parametrize( + "file_tree,bad_symlink", + [ + # good + pytest.param({}, None, id="empty-no-symlink"), + pytest.param( + {"symlink_to_self": Symlink(".")}, + None, + id="self-symlink-ok"), + pytest.param( + {"subdir": {"symlink_to_parent": Symlink("..")}}, + None, + id="parent-symlink-ok"), + pytest.param( + {"symlink_to_subdir": Symlink("subdir/some_file"), + "subdir": {"some_file": "foo"}}, + None, + id="subdir-symlink-ok"), + # bad + pytest.param( + {"symlink_to_parent": Symlink("..")}, + "symlink_to_parent", + id="parent-symlink-bad"), + pytest.param( + {"symlink_to_root": Symlink("/")}, + "symlink_to_root", + id="root-symlink-bad"), + pytest.param( + {"subdir": {"symlink_to_parent_parent": Symlink("../..")}}, + "subdir/symlink_to_parent_parent", + id="parent-parent-symlink-bad"), + pytest.param( + {"subdir": {"symlink_to_root": Symlink("/")}}, + "subdir/symlink_to_root", + id="subdir-root-symlink-bad"), + ], +) +def test_enforce_sandbox(file_tree, bad_symlink, tmp_path): + write_file_tree(file_tree, tmp_path) + if bad_symlink: + error = f"The destination of {bad_symlink!r} is outside of cloned repository" + with pytest.raises(OsbsValidationException, match=error): + _enforce_sandbox(tmp_path, remove_unsafe_symlinks=False) + assert Path(tmp_path / bad_symlink).exists() + _enforce_sandbox(tmp_path, remove_unsafe_symlinks=True) + assert not Path(tmp_path / bad_symlink).exists() + else: + _enforce_sandbox(tmp_path, remove_unsafe_symlinks=False) + _enforce_sandbox(tmp_path, remove_unsafe_symlinks=True) + + +def test_enforce_sandbox_symlink_loop(tmp_path, caplog): + workers_logger = logging.getLogger("cachito.workers.tasks.general") + workers_logger.disabled = False + workers_logger.setLevel(logging.INFO) + + file_tree = {"foo_b": Symlink("foo_a"), "foo_a": Symlink("foo_b")} + write_file_tree(file_tree, tmp_path) + _enforce_sandbox(tmp_path, remove_unsafe_symlinks=True) + assert "Symlink loop from " in caplog.text + + +@mock.patch("Path.resolve") +def test_enforce_sandbox_runtime_error(mock_resolve, tmp_path): + workers_logger = logging.getLogger("cachito.workers.tasks.general") + workers_logger.disabled = False + workers_logger.setLevel(logging.INFO) + + error = "RuntimeError is triggered" + + def side_effect(): + raise RuntimeError(error) + + mock_resolve.side_effect = side_effect + + file_tree = {"foo_b": Symlink("foo_a"), "foo_a": Symlink("foo_b")} + write_file_tree(file_tree, tmp_path) + with pytest.raises(RuntimeError, match=error): + _enforce_sandbox(tmp_path, remove_unsafe_symlinks=True)