Skip to content

Commit

Permalink
Add support for cross repository blob mount
Browse files Browse the repository at this point in the history
closes pulp#494
  • Loading branch information
lubosmj committed Mar 7, 2022
1 parent 70dbc8b commit 2d3b895
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGES/494.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added support for cross repository blob mount.
3 changes: 3 additions & 0 deletions docs/workflows/push.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ image.
Having disabled the token authentication, only users with staff privileges (i.e.,
administrators) are allowed to push content to the registry.

The registry supports cross repository blob mounting. When uploading blobs that already exist in
the registry as a part of a different repository, the content is not being uploaded but rather
referenced from another repository to reduce network traffic.

::

Expand Down
21 changes: 16 additions & 5 deletions pulp_container/app/authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ class AuthorizationService:
according to a user's scope.
"""

def __init__(self, user, service, scope):
def __init__(self, user, service, scopes):
"""
Store class-wide variables and initialize a dictionary used for determining permissions.
Args:
user (django.contrib.auth.models.User): Requesting user.
service (str): Name of the service access is granted to.
scope (str): Scope of the resource that is to be accessed.
scopes (list): Scopes of the resources that are to be accessed.
"""
self.user = user
self.service = service
self.scope = scope
self.scopes = scopes
self.access_policy = RegistryAccessPolicy()

self.actions_permissions = defaultdict(
Expand Down Expand Up @@ -133,10 +133,21 @@ def determine_access(self):
endpoint.
"""
if not self.scope or self.scope.count(":") != 2:
if not self.scopes:
return []

typ, name, actions = self.scope.split(":")
if any(scope.count(":") != 2 for scope in self.scopes):
return []

permitted_scopes = []
for scope in self.scopes:
permitted_scopes.extend(self.permit_scope(scope))

return permitted_scopes

def permit_scope(self, scope):
"""Permit a received access scope based on user's permissions."""
typ, name, actions = scope.split(":")
actions = set(actions.split(","))

permitted_actions = set()
Expand Down
62 changes: 58 additions & 4 deletions pulp_container/app/registry_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ def get(self, request):
service = request.query_params["service"]
except KeyError:
raise ParseError(detail="No service name provided.")
scope = request.query_params.get("scope", "")
scope = request.query_params.getlist("scope", [])

authorization_service = AuthorizationService(self.request.user, service, scope)
data = authorization_service.generate_token()
Expand Down Expand Up @@ -554,12 +554,66 @@ def create(self, request, path):
"""
_, repository = self.get_dr_push(request, path, create=True)

upload = models.Upload(repository=repository, size=0)
upload.save()
response = UploadResponse(upload=upload, path=path, request=request)
if self.tries_to_mount_blob(request):
response = self.mount_blob(request, path, repository)
else:
upload = models.Upload(repository=repository, size=0)
upload.save()
response = UploadResponse(upload=upload, path=path, request=request)

return response

@staticmethod
def tries_to_mount_blob(request):
"""Check if a client is trying to perform cross repository blob mounting."""
return (request.query_params.keys()) == {"from", "mount"}

def mount_blob(self, request, path, repository):
"""Mount a blob that is already present in another repository."""
from_path = request.query_params["from"]
try:
distribution = models.ContainerDistribution.objects.get(base_path=from_path)
except models.ContainerDistribution.DoesNotExist:
raise RepositoryNotFound(name=path)

try:
version = distribution.repository_version or distribution.repository.latest_version()
except AttributeError:
# the distribution does not contain reference to the source repository version
raise RepositoryNotFound(name=from_path)

digest = request.query_params["mount"]
try:
blob = models.Blob.objects.get(digest=digest, pk__in=version.content)
except models.Blob.DoesNotExist:
raise BlobNotFound(digest=digest)

dispatched_task = dispatch(
add_and_remove,
shared_resources=[version.repository],
exclusive_resources=[repository],
kwargs={
"repository_pk": str(repository.pk),
"add_content_units": [str(blob.pk)],
"remove_content_units": [],
},
)

# Wait a small amount of time
for dummy in range(3):
time.sleep(1)
task = Task.objects.get(pk=dispatched_task.pk)
if task.state == "completed":
task.delete()
return BlobResponse(blob, path, 201, request)
elif task.state in ["waiting", "running"]:
continue
else:
error = task.error
task.delete()
raise Exception(str(error))
raise Throttled()

def partial_update(self, request, path, pk=None):
"""
Process a chunk that will be appended to an existing upload.
Expand Down
157 changes: 153 additions & 4 deletions pulp_container/tests/functional/api/test_rbac_repo_versions.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,35 @@
# coding=utf-8
"""Tests that verify that RBAC for repository versions work properly."""
import requests
import unittest

from urllib.parse import urlparse
from urllib.parse import urlparse, urljoin

from pulp_smash import cli, config
from pulp_smash.pulp3.bindings import delete_orphans, monitor_task
from pulp_smash import api, cli, config
from pulp_smash.pulp3.bindings import delete_orphans, monitor_task, PulpTestCase
from pulp_smash.pulp3.utils import gen_repo

from pulpcore.client.pulp_container.exceptions import ApiException

from pulp_container.tests.functional.api import rbac_base
from pulp_container.tests.functional.constants import PULP_FIXTURE_1, REGISTRY_V2_REPO_PULP
from pulp_container.tests.functional.utils import (
TOKEN_AUTH_DISABLED,
del_user,
gen_container_client,
gen_container_remote,
gen_user,
BearerTokenAuth,
AuthenticationHeaderQueries,
)

from pulpcore.client.pulp_container import (
ContainerContainerRepository,
ContentManifestsApi,
ContentBlobsApi,
ContentTagsApi,
ContentManifestsApi,
ContainerRepositorySyncURL,
DistributionsContainerApi,
PulpContainerNamespacesApi,
RepositoriesContainerApi,
RepositoriesContainerPushApi,
Expand Down Expand Up @@ -222,3 +228,146 @@ def test_repov_read(self):
self.user_reader["repo_version_api"].read(repository.latest_version_href)
with self.assertRaises(ApiException):
self.user_helpless["repo_version_api"].read(repository.latest_version_href)


class PushCrossRepoBlobMountTestCase(PulpTestCase, rbac_base.BaseRegistryTest):
"""A test case for verifying the cross repository blob mount functionality.
The test case also verifies whether different access scopes are evaluated properly or not. For
instance, users who do not have permissions to pull and push content, they should not be able
to trigger the cross repository blob mount procedure.
"""

@classmethod
def setUpClass(cls):
"""Initialize class-wide variables and create a new repository by pushing content to it."""
cls.cfg = config.get_config()
cls.client = api.Client(cls.cfg, api.code_handler)
cls.registry = cli.RegistryClient(cls.cfg)
cls.registry.raise_if_unsupported(unittest.SkipTest, "Tests require podman/docker")

cls.registry_name = urlparse(cls.cfg.get_base_url()).netloc

admin_user, admin_password = cls.cfg.pulp_auth
cls.user_admin = {"username": admin_user, "password": admin_password}

api_client = gen_container_client()
api_client.configuration.username = cls.user_admin["username"]
api_client.configuration.password = cls.user_admin["password"]

cls.distributions_api = DistributionsContainerApi(api_client)
cls.pushrepository_api = RepositoriesContainerPushApi(api_client)
cls.repo_version_api = RepositoriesContainerVersionsApi(api_client)
cls.blobs_api = ContentBlobsApi(api_client)

cls._pull(f"{REGISTRY_V2_REPO_PULP}:manifest_a")

local_url = f"{cls.registry_name}/test-1:manifest_a"
image_path = f"{REGISTRY_V2_REPO_PULP}:manifest_a"
cls._push(image_path, local_url, cls.user_admin)

repository = cls.pushrepository_api.list(name="test-1").results[0]
cls.blobs = cls.blobs_api.list(repository_version=repository.latest_version_href).results
cls.distribution = cls.distributions_api.list(name="test-1").results[0]

cls.user_pull = gen_user(
object_permissions=[
("container.pull_containerdistribution", cls.distribution.pulp_href)
]
)
cls.user_push = gen_user(
[
"container.push_containerdistribution",
"container.namespace_push_containerdistribution",
]
)
cls.user_anon = gen_user([])

@classmethod
def tearDownClass(cls):
"""Delete created users and a distribution that was created in the first stage."""
monitor_task(cls.distributions_api.delete(cls.distribution.pulp_href).task)

delete_orphans()
del_user(cls.user_pull)
del_user(cls.user_push)
del_user(cls.user_anon)

def tearDown(self):
"""Delete a newly created repository if exists."""
distributions = self.distributions_api.list(name="test-2").results
if distributions:
monitor_task(self.distributions_api.delete(distributions[0].pulp_href).task)
delete_orphans()

def test_mount_blobs_as_admin(self):
"""Test if an admin user can trigger blob mounting successfully."""
admin_basic_auth = (self.user_admin["username"], self.user_admin["password"])
for i, blob in enumerate(self.blobs, 1):
content_response, token_auth = self.mount_blob(blob, admin_basic_auth)
assert content_response.status_code == 201
assert content_response.text == ""

blob_url = f"/v2/test-2/blobs/{blob.digest}"
url = urljoin(self.cfg.get_base_url(), blob_url)
content_response = self.client.head(url, auth=token_auth)
assert content_response.status_code == 200

repo_href = self.distributions_api.list(name="test-2").results[0].repository
version_href = self.pushrepository_api.read(repo_href).latest_version_href
assert f"{repo_href}versions/{i}/" == version_href

added_blobs = self.blobs_api.list(repository_version_added=version_href).results
assert len(added_blobs) == 1
assert added_blobs[0].digest == blob.digest

@unittest.skipIf(TOKEN_AUTH_DISABLED, "Only administrators can push content to the Registry.")
def test_mount_blobs_as_user_pull(self):
"""Test if a user with pull permission, but not push permission, is not able to mount."""
user_pull_basic_auth = self.user_pull["username"], self.user_pull["password"]
for i, blob in enumerate(self.blobs, 1):
content_response, _ = self.mount_blob(blob, user_pull_basic_auth)
assert content_response.status_code == 401

@unittest.skipIf(TOKEN_AUTH_DISABLED, "Only administrators can push content to the Registry.")
def test_mount_blobs_as_user_push(self):
"""Test if a user with push permission, but not pull permission, is not able to mount."""
user_push_basic_auth = self.user_push["username"], self.user_push["password"]
for i, blob in enumerate(self.blobs, 1):
content_response, _ = self.mount_blob(blob, user_push_basic_auth)
assert content_response.status_code == 401

@unittest.skipIf(TOKEN_AUTH_DISABLED, "Only administrators can push content to the Registry.")
def test_mount_blobs_as_user_anon(self):
"""Test if an anonymous user with no permissions is not able to mount."""
user_anon_basic_auth = self.user_anon["username"], self.user_anon["password"]
for i, blob in enumerate(self.blobs, 1):
content_response, _ = self.mount_blob(blob, user_anon_basic_auth)
assert content_response.status_code == 401

def mount_blob(self, blob, basic_auth):
"""Try to mount the blob with the provided credentials."""
mount_url = f"/v2/test-2/blobs/uploads/?from=test-1&mount={blob.digest}"
url = urljoin(self.cfg.get_base_url(), mount_url)

if TOKEN_AUTH_DISABLED:
auth = basic_auth
else:
response = requests.post(url, auth=basic_auth)
assert response.status_code == 401

authenticate_header = response.headers["Www-Authenticate"]
queries = AuthenticationHeaderQueries(authenticate_header)
response = requests.get(
queries.realm,
params={
"service": queries.service,
"scope": [queries.scope, "repository:test-1:pull"],
},
auth=basic_auth,
)
response.raise_for_status()
token = response.json()["token"]
auth = BearerTokenAuth(token)

return requests.post(url, auth=auth), auth

0 comments on commit 2d3b895

Please sign in to comment.