diff --git a/CHANGES/494.feature b/CHANGES/494.feature new file mode 100644 index 000000000..fe6aa02fc --- /dev/null +++ b/CHANGES/494.feature @@ -0,0 +1 @@ +Added support for cross repository blob mount. diff --git a/docs/workflows/push.rst b/docs/workflows/push.rst index b649ee8b9..c08bf6de8 100644 --- a/docs/workflows/push.rst +++ b/docs/workflows/push.rst @@ -12,6 +12,9 @@ image. Having disabled the token authentication, only users with staff privileges (i.e., administrators) are allowed to push content to the registry. +The registry supports cross repository blob mounting. When uploading blobs that already exist in +the registry as a part of a different repository, the content is not being uploaded but rather +referenced from another repository to reduce network traffic. :: diff --git a/pulp_container/app/authorization.py b/pulp_container/app/authorization.py index e44421432..c88f7e6cd 100644 --- a/pulp_container/app/authorization.py +++ b/pulp_container/app/authorization.py @@ -31,19 +31,19 @@ class AuthorizationService: according to a user's scope. """ - def __init__(self, user, service, scope): + def __init__(self, user, service, scopes): """ Store class-wide variables and initialize a dictionary used for determining permissions. Args: user (django.contrib.auth.models.User): Requesting user. service (str): Name of the service access is granted to. - scope (str): Scope of the resource that is to be accessed. + scopes (list): Scopes of the resources that are to be accessed. """ self.user = user self.service = service - self.scope = scope + self.scopes = scopes self.access_policy = RegistryAccessPolicy() self.actions_permissions = defaultdict( @@ -133,10 +133,21 @@ def determine_access(self): endpoint. """ - if not self.scope or self.scope.count(":") != 2: + if not self.scopes: return [] - typ, name, actions = self.scope.split(":") + if any(scope.count(":") != 2 for scope in self.scopes): + return [] + + permitted_scopes = [] + for scope in self.scopes: + permitted_scopes.extend(self.permit_scope(scope)) + + return permitted_scopes + + def permit_scope(self, scope): + """Permit a received access scope based on user's permissions.""" + typ, name, actions = scope.split(":") actions = set(actions.split(",")) permitted_actions = set() diff --git a/pulp_container/app/registry_api.py b/pulp_container/app/registry_api.py index e459d1e35..72757bba2 100644 --- a/pulp_container/app/registry_api.py +++ b/pulp_container/app/registry_api.py @@ -356,7 +356,7 @@ def get(self, request): service = request.query_params["service"] except KeyError: raise ParseError(detail="No service name provided.") - scope = request.query_params.get("scope", "") + scope = request.query_params.getlist("scope", []) authorization_service = AuthorizationService(self.request.user, service, scope) data = authorization_service.generate_token() @@ -554,12 +554,66 @@ def create(self, request, path): """ _, repository = self.get_dr_push(request, path, create=True) - upload = models.Upload(repository=repository, size=0) - upload.save() - response = UploadResponse(upload=upload, path=path, request=request) + if self.tries_to_mount_blob(request): + response = self.mount_blob(request, path, repository) + else: + upload = models.Upload(repository=repository, size=0) + upload.save() + response = UploadResponse(upload=upload, path=path, request=request) return response + @staticmethod + def tries_to_mount_blob(request): + """Check if a client is trying to perform cross repository blob mounting.""" + return (request.query_params.keys()) == {"from", "mount"} + + def mount_blob(self, request, path, repository): + """Mount a blob that is already present in another repository.""" + from_path = request.query_params["from"] + try: + distribution = models.ContainerDistribution.objects.get(base_path=from_path) + except models.ContainerDistribution.DoesNotExist: + raise RepositoryNotFound(name=path) + + try: + version = distribution.repository_version or distribution.repository.latest_version() + except AttributeError: + # the distribution does not contain reference to the source repository version + raise RepositoryNotFound(name=from_path) + + digest = request.query_params["mount"] + try: + blob = models.Blob.objects.get(digest=digest, pk__in=version.content) + except models.Blob.DoesNotExist: + raise BlobNotFound(digest=digest) + + dispatched_task = dispatch( + add_and_remove, + shared_resources=[version.repository], + exclusive_resources=[repository], + kwargs={ + "repository_pk": str(repository.pk), + "add_content_units": [str(blob.pk)], + "remove_content_units": [], + }, + ) + + # Wait a small amount of time + for dummy in range(3): + time.sleep(1) + task = Task.objects.get(pk=dispatched_task.pk) + if task.state == "completed": + task.delete() + return BlobResponse(blob, path, 201, request) + elif task.state in ["waiting", "running"]: + continue + else: + error = task.error + task.delete() + raise Exception(str(error)) + raise Throttled() + def partial_update(self, request, path, pk=None): """ Process a chunk that will be appended to an existing upload. diff --git a/pulp_container/tests/functional/api/test_rbac_repo_versions.py b/pulp_container/tests/functional/api/test_rbac_repo_versions.py index 687f86648..88e327e87 100644 --- a/pulp_container/tests/functional/api/test_rbac_repo_versions.py +++ b/pulp_container/tests/functional/api/test_rbac_repo_versions.py @@ -1,11 +1,12 @@ # coding=utf-8 """Tests that verify that RBAC for repository versions work properly.""" +import requests import unittest -from urllib.parse import urlparse +from urllib.parse import urlparse, urljoin -from pulp_smash import cli, config -from pulp_smash.pulp3.bindings import delete_orphans, monitor_task +from pulp_smash import api, cli, config +from pulp_smash.pulp3.bindings import delete_orphans, monitor_task, PulpTestCase from pulp_smash.pulp3.utils import gen_repo from pulpcore.client.pulp_container.exceptions import ApiException @@ -13,17 +14,22 @@ from pulp_container.tests.functional.api import rbac_base from pulp_container.tests.functional.constants import PULP_FIXTURE_1, REGISTRY_V2_REPO_PULP from pulp_container.tests.functional.utils import ( + TOKEN_AUTH_DISABLED, del_user, gen_container_client, gen_container_remote, gen_user, + BearerTokenAuth, + AuthenticationHeaderQueries, ) from pulpcore.client.pulp_container import ( ContainerContainerRepository, - ContentManifestsApi, + ContentBlobsApi, ContentTagsApi, + ContentManifestsApi, ContainerRepositorySyncURL, + DistributionsContainerApi, PulpContainerNamespacesApi, RepositoriesContainerApi, RepositoriesContainerPushApi, @@ -222,3 +228,146 @@ def test_repov_read(self): self.user_reader["repo_version_api"].read(repository.latest_version_href) with self.assertRaises(ApiException): self.user_helpless["repo_version_api"].read(repository.latest_version_href) + + +class PushCrossRepoBlobMountTestCase(PulpTestCase, rbac_base.BaseRegistryTest): + """A test case for verifying the cross repository blob mount functionality. + + The test case also verifies whether different access scopes are evaluated properly or not. For + instance, users who do not have permissions to pull and push content, they should not be able + to trigger the cross repository blob mount procedure. + """ + + @classmethod + def setUpClass(cls): + """Initialize class-wide variables and create a new repository by pushing content to it.""" + cls.cfg = config.get_config() + cls.client = api.Client(cls.cfg, api.code_handler) + cls.registry = cli.RegistryClient(cls.cfg) + cls.registry.raise_if_unsupported(unittest.SkipTest, "Tests require podman/docker") + + cls.registry_name = urlparse(cls.cfg.get_base_url()).netloc + + admin_user, admin_password = cls.cfg.pulp_auth + cls.user_admin = {"username": admin_user, "password": admin_password} + + api_client = gen_container_client() + api_client.configuration.username = cls.user_admin["username"] + api_client.configuration.password = cls.user_admin["password"] + + cls.distributions_api = DistributionsContainerApi(api_client) + cls.pushrepository_api = RepositoriesContainerPushApi(api_client) + cls.repo_version_api = RepositoriesContainerVersionsApi(api_client) + cls.blobs_api = ContentBlobsApi(api_client) + + cls._pull(f"{REGISTRY_V2_REPO_PULP}:manifest_a") + + local_url = f"{cls.registry_name}/test-1:manifest_a" + image_path = f"{REGISTRY_V2_REPO_PULP}:manifest_a" + cls._push(image_path, local_url, cls.user_admin) + + repository = cls.pushrepository_api.list(name="test-1").results[0] + cls.blobs = cls.blobs_api.list(repository_version=repository.latest_version_href).results + cls.distribution = cls.distributions_api.list(name="test-1").results[0] + + cls.user_pull = gen_user( + object_permissions=[ + ("container.pull_containerdistribution", cls.distribution.pulp_href) + ] + ) + cls.user_push = gen_user( + [ + "container.push_containerdistribution", + "container.namespace_push_containerdistribution", + ] + ) + cls.user_anon = gen_user([]) + + @classmethod + def tearDownClass(cls): + """Delete created users and a distribution that was created in the first stage.""" + monitor_task(cls.distributions_api.delete(cls.distribution.pulp_href).task) + + delete_orphans() + del_user(cls.user_pull) + del_user(cls.user_push) + del_user(cls.user_anon) + + def tearDown(self): + """Delete a newly created repository if exists.""" + distributions = self.distributions_api.list(name="test-2").results + if distributions: + monitor_task(self.distributions_api.delete(distributions[0].pulp_href).task) + delete_orphans() + + def test_mount_blobs_as_admin(self): + """Test if an admin user can trigger blob mounting successfully.""" + admin_basic_auth = (self.user_admin["username"], self.user_admin["password"]) + for i, blob in enumerate(self.blobs, 1): + content_response, token_auth = self.mount_blob(blob, admin_basic_auth) + assert content_response.status_code == 201 + assert content_response.text == "" + + blob_url = f"/v2/test-2/blobs/{blob.digest}" + url = urljoin(self.cfg.get_base_url(), blob_url) + content_response = self.client.head(url, auth=token_auth) + assert content_response.status_code == 200 + + repo_href = self.distributions_api.list(name="test-2").results[0].repository + version_href = self.pushrepository_api.read(repo_href).latest_version_href + assert f"{repo_href}versions/{i}/" == version_href + + added_blobs = self.blobs_api.list(repository_version_added=version_href).results + assert len(added_blobs) == 1 + assert added_blobs[0].digest == blob.digest + + @unittest.skipIf(TOKEN_AUTH_DISABLED, "Only administrators can push content to the Registry.") + def test_mount_blobs_as_user_pull(self): + """Test if a user with pull permission, but not push permission, is not able to mount.""" + user_pull_basic_auth = self.user_pull["username"], self.user_pull["password"] + for i, blob in enumerate(self.blobs, 1): + content_response, _ = self.mount_blob(blob, user_pull_basic_auth) + assert content_response.status_code == 401 + + @unittest.skipIf(TOKEN_AUTH_DISABLED, "Only administrators can push content to the Registry.") + def test_mount_blobs_as_user_push(self): + """Test if a user with push permission, but not pull permission, is not able to mount.""" + user_push_basic_auth = self.user_push["username"], self.user_push["password"] + for i, blob in enumerate(self.blobs, 1): + content_response, _ = self.mount_blob(blob, user_push_basic_auth) + assert content_response.status_code == 401 + + @unittest.skipIf(TOKEN_AUTH_DISABLED, "Only administrators can push content to the Registry.") + def test_mount_blobs_as_user_anon(self): + """Test if an anonymous user with no permissions is not able to mount.""" + user_anon_basic_auth = self.user_anon["username"], self.user_anon["password"] + for i, blob in enumerate(self.blobs, 1): + content_response, _ = self.mount_blob(blob, user_anon_basic_auth) + assert content_response.status_code == 401 + + def mount_blob(self, blob, basic_auth): + """Try to mount the blob with the provided credentials.""" + mount_url = f"/v2/test-2/blobs/uploads/?from=test-1&mount={blob.digest}" + url = urljoin(self.cfg.get_base_url(), mount_url) + + if TOKEN_AUTH_DISABLED: + auth = basic_auth + else: + response = requests.post(url, auth=basic_auth) + assert response.status_code == 401 + + authenticate_header = response.headers["Www-Authenticate"] + queries = AuthenticationHeaderQueries(authenticate_header) + response = requests.get( + queries.realm, + params={ + "service": queries.service, + "scope": [queries.scope, "repository:test-1:pull"], + }, + auth=basic_auth, + ) + response.raise_for_status() + token = response.json()["token"] + auth = BearerTokenAuth(token) + + return requests.post(url, auth=auth), auth