From 86fb60793a03996865a9d969aa9d004de48bfa41 Mon Sep 17 00:00:00 2001 From: Lubos Mjachky Date: Fri, 19 May 2023 12:30:33 +0200 Subject: [PATCH] Add pull-through caching closes #507 --- CHANGES/507.feature | 3 + pulp_container/app/downloaders.py | 39 +++ .../0037_create_pull_through_cache_models.py | 63 +++++ pulp_container/app/models.py | 137 +++++++++ pulp_container/app/redirects.py | 6 + pulp_container/app/registry.py | 34 +-- pulp_container/app/registry_api.py | 265 +++++++++++++++++- pulp_container/app/serializers.py | 48 +++- pulp_container/app/tasks/sync_stages.py | 77 +---- pulp_container/app/utils.py | 75 +++++ pulp_container/app/viewsets.py | 20 ++ .../functional/api/test_pull_through_cache.py | 33 +++ pulp_container/tests/functional/conftest.py | 14 + requirements.txt | 2 +- 14 files changed, 717 insertions(+), 99 deletions(-) create mode 100644 CHANGES/507.feature create mode 100644 pulp_container/app/migrations/0037_create_pull_through_cache_models.py create mode 100644 pulp_container/tests/functional/api/test_pull_through_cache.py diff --git a/CHANGES/507.feature b/CHANGES/507.feature new file mode 100644 index 000000000..ad78e4b0c --- /dev/null +++ b/CHANGES/507.feature @@ -0,0 +1,3 @@ +Added support for pull-through caching. Users can now create a distribution with a remote pointing +to a remote registry without specifying the upstream name and Pulp automatically downloads missing +content and acts as a smart proxy. diff --git a/pulp_container/app/downloaders.py b/pulp_container/app/downloaders.py index fc49b3bfe..cd14b899d 100644 --- a/pulp_container/app/downloaders.py +++ b/pulp_container/app/downloaders.py @@ -2,13 +2,18 @@ import asyncio import json import re +import tempfile from aiohttp.client_exceptions import ClientResponseError from logging import getLogger from multidict import MultiDict from urllib import parse +from django.conf import settings + +from pulpcore.plugin.models import Artifact, Task from pulpcore.plugin.download import DownloaderFactory, HttpDownloader +from pulpcore.plugin.pulp_hashlib import new as pulp_hashlib_new from pulp_container.constants import V2_ACCEPT_HEADERS @@ -94,7 +99,9 @@ async def _run(self, handle_401=True, extra_data=None): return await self._run(handle_401=False, extra_data=extra_data) else: raise + to_return = await self._handle_response(response) + await response.release() self.response_headers = response.headers @@ -102,6 +109,38 @@ async def _run(self, handle_401=True, extra_data=None): self.session.close() return to_return + def _ensure_writer_has_open_file(self): + """ + Create a temporary file on demand. + + Create a temporary file when it's actually used, allowing plugin writers to instantiate + many downloaders in memory. + + This method sets the path of NamedTemporaryFile dynamically based on whether it is running + from a task or not. Otherwise, permission errors might be raised when Pulp is trying to + download a file from api-app and write to a user space. + """ + if not self._writer: + dir_path = settings.WORKING_DIRECTORY if Task.current() is None else "." + self._writer = tempfile.NamedTemporaryFile(dir=dir_path, delete=False) + self.path = self._writer.name + self._digests = {n: pulp_hashlib_new(n) for n in Artifact.DIGEST_FIELDS} + self._size = 0 + + def fetch(self, extra_data=None): + """ + Run the download synchronously with additional data and return the `DownloadResult`. + + Returns: + :class:`~pulpcore.plugin.download.DownloadResult` + or :class:`~aiohttp.client.ClientResponse` + + Raises: + Exception: Any fatal exception emitted during downloading + """ + done, _ = asyncio.get_event_loop().run_until_complete(self.run(extra_data=extra_data)) + return done.pop().result() + async def update_token(self, response_auth_header, used_token, repo_name): """ Update the Bearer token to be used with all requests. diff --git a/pulp_container/app/migrations/0037_create_pull_through_cache_models.py b/pulp_container/app/migrations/0037_create_pull_through_cache_models.py new file mode 100644 index 000000000..d22b70e09 --- /dev/null +++ b/pulp_container/app/migrations/0037_create_pull_through_cache_models.py @@ -0,0 +1,63 @@ +# Generated by Django 4.2.2 on 2023-06-15 09:50 + +from django.db import migrations, models +import django.db.models.deletion +import pulpcore.app.models.access_policy + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0107_distribution_hidden'), + ('container', '0036_containerpushrepository_pending_blobs_manifests'), + ] + + operations = [ + migrations.CreateModel( + name='ContainerPullThroughDistribution', + fields=[ + ('distribution_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='core.distribution')), + ('private', models.BooleanField(default=False, help_text='Restrict pull access to explicitly authorized users. Defaults to unrestricted pull access.')), + ], + options={ + 'default_related_name': '%(app_label)s_%(model_name)s', + }, + bases=('core.distribution', pulpcore.app.models.access_policy.AutoAddObjPermsMixin), + ), + migrations.CreateModel( + name='ContainerPullThroughRemote', + fields=[ + ('remote_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='core.remote')), + ('upstream_name', models.TextField(db_index=True)), + ], + options={ + 'default_related_name': '%(app_label)s_%(model_name)s', + }, + bases=('core.remote', pulpcore.app.models.access_policy.AutoAddObjPermsMixin), + ), + migrations.AddField( + model_name='containerrepository', + name='pending_blobs', + field=models.ManyToManyField(related_name='pending_blobs', to='container.blob'), + ), + migrations.AddField( + model_name='containerrepository', + name='pending_manifests', + field=models.ManyToManyField(to='container.manifest'), + ), + migrations.AddField( + model_name='containerrepository', + name='pending_tags', + field=models.ManyToManyField(to='container.tag'), + ), + migrations.AddField( + model_name='containerrepository', + name='remaining_blobs', + field=models.ManyToManyField(related_name='remaining_blobs', to='container.blob'), + ), + migrations.AddField( + model_name='containerdistribution', + name='pull_through_distribution', + field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, related_name='distributions', to='container.containerpullthroughdistribution'), + ), + ] diff --git a/pulp_container/app/models.py b/pulp_container/app/models.py index d82fd9cf4..fd341db4b 100644 --- a/pulp_container/app/models.py +++ b/pulp_container/app/models.py @@ -6,6 +6,7 @@ import tempfile import time from logging import getLogger +from pathlib import PurePath from django.db import models from django.conf import settings @@ -105,6 +106,10 @@ class Manifest(Content): through_fields=("image_manifest", "manifest_list"), ) + @staticmethod + def init_from_artifact_and_relative_path(artifact, relative_path): + pass + class Meta: default_related_name = "%(app_label)s_%(model_name)s" unique_together = ("digest",) @@ -401,6 +406,33 @@ def namespaced_upstream_name(self): else: return self.upstream_name + def get_remote_artifact_url(self, relative_path=None, request=None): + """ + TODO: ensure that the functionality is not affected by keywords included within the path + """ + if "manifests" in request.path: + if "tag_name" in request.match_info: + tag_name = request.match_info["tag_name"] + return os.path.join(self.url, "v2", relative_path, "manifests", tag_name) + elif "digest" in request.match_info: + digest = "sha256:{digest}".format(digest=request.match_info["digest"]) + return os.path.join(self.url, "v2", relative_path, "manifests", digest) + elif "blobs" in request.path: + digest = "sha256:{digest}".format(digest=request.match_info["digest"]) + return os.path.join(self.url, "v2", relative_path, "blobs", digest) + + def get_remote_artifact_content_type(self, relative_path=None): + """ + TODO: re-evaluate the need of this method + """ + if relative_path: + type_path = PurePath(relative_path) + if type_path.match("manifests/.*"): + return Manifest + elif type_path.match("blobs/.*"): + return Blob + return None + class Meta: default_related_name = "%(app_label)s_%(model_name)s" permissions = [ @@ -411,6 +443,68 @@ class Meta: ] +class ContainerPullThroughRemote(Remote, AutoAddObjPermsMixin): + """ + TODO: Add permissions. + """ + + TYPE = "pull-through" + + upstream_name = models.TextField(db_index=True) + + @property + def download_factory(self): + """ + Downloader Factory that maps to custom downloaders which support registry auth. + + Upon first access, the DownloaderFactory is instantiated and saved internally. + + Returns: + DownloadFactory: The instantiated DownloaderFactory to be used by + get_downloader() + + """ + try: + return self._download_factory + except AttributeError: + self._download_factory = DownloaderFactory( + self, + downloader_overrides={ + "http": downloaders.RegistryAuthHttpDownloader, + "https": downloaders.RegistryAuthHttpDownloader, + }, + ) + return self._download_factory + + def get_downloader(self, remote_artifact=None, url=None, **kwargs): + """ + Get a downloader from either a RemoteArtifact or URL that is configured with this Remote. + + This method accepts either `remote_artifact` or `url` but not both. At least one is + required. If neither or both are passed a ValueError is raised. + + Args: + remote_artifact (:class:`~pulpcore.app.models.RemoteArtifact`): The RemoteArtifact to + download. + url (str): The URL to download. + kwargs (dict): This accepts the parameters of + :class:`~pulpcore.plugin.download.BaseDownloader`. + + Raises: + ValueError: If neither remote_artifact and url are passed, or if both are passed. + + Returns: + subclass of :class:`~pulpcore.plugin.download.BaseDownloader`: A downloader that + is configured with the remote settings. + + """ + kwargs["remote"] = self + return super().get_downloader(remote_artifact=remote_artifact, url=url, **kwargs) + + class Meta: + default_related_name = "%(app_label)s_%(model_name)s" + + class ManifestSigningService(SigningService): """ Signing service used for creating container signatures. @@ -485,6 +579,13 @@ class ContainerRepository( ManifestSigningService, on_delete=models.SET_NULL, null=True ) + # temporary relations used for uncommitted pull-through cache operations + pending_tags = models.ManyToManyField(Tag) + pending_manifests = models.ManyToManyField(Manifest) + pending_blobs = models.ManyToManyField(Blob, related_name="pending_blobs") + # digests of remaining blobs to be attached to pending manifests + remaining_blobs = models.ManyToManyField(Blob, related_name="remaining_blobs") + class Meta: default_related_name = "%(app_label)s_%(model_name)s" permissions = [ @@ -507,6 +608,16 @@ def finalize_new_version(self, new_version): """ remove_duplicates(new_version) validate_repo_version(new_version) + self.remove_pending_content(new_version) + + def remove_pending_content(self, repository_version): + """Remove pending blobs and manifests when committing the content to the repository.""" + added_content = repository_version.added( + base_version=repository_version.base_version + ).values_list("pk") + self.pending_tags.remove(*Tag.objects.filter(pk__in=added_content)) + self.pending_manifests.remove(*Manifest.objects.filter(pk__in=added_content)) + self.pending_blobs.remove(*Blob.objects.filter(pk__in=added_content)) class ContainerPushRepository(Repository, AutoAddObjPermsMixin): @@ -563,6 +674,25 @@ def remove_pending_content(self, repository_version): self.pending_manifests.remove(*Manifest.objects.filter(pk__in=added_content)) +class ContainerPullThroughDistribution(Distribution, AutoAddObjPermsMixin): + """ + TODO: Add permissions. + """ + + TYPE = "pull-through" + + private = models.BooleanField( + default=False, + help_text=_( + "Restrict pull access to explicitly authorized users. " + "Defaults to unrestricted pull access." + ), + ) + + class Meta: + default_related_name = "%(app_label)s_%(model_name)s" + + class ContainerDistribution(Distribution, AutoAddObjPermsMixin): """ A container distribution defines how a repository version is distributed by Pulp's webserver. @@ -593,6 +723,13 @@ class ContainerDistribution(Distribution, AutoAddObjPermsMixin): ) description = models.TextField(null=True) + pull_through_distribution = models.ForeignKey( + ContainerPullThroughDistribution, + related_name="distributions", + on_delete=models.CASCADE, + null=True, + ) + def get_repository_version(self): """ Returns the repository version that is supposed to be served by this ContainerDistribution. diff --git a/pulp_container/app/redirects.py b/pulp_container/app/redirects.py index e50d6a469..2b7723e7c 100644 --- a/pulp_container/app/redirects.py +++ b/pulp_container/app/redirects.py @@ -52,6 +52,12 @@ def issue_blob_redirect(self, blob): """ return self.redirect_to_content_app("blobs", blob.digest) + def issue_pull_through_manifests_redirect(self, pk): + return self.redirect_to_content_app("manifests", pk) + + def issue_pull_through_blobs_redirect(self, pk): + return self.redirect_to_content_app("blobs", pk) + class S3StorageRedirects(CommonRedirects): """ diff --git a/pulp_container/app/registry.py b/pulp_container/app/registry.py index 706ae8998..14ef1279f 100644 --- a/pulp_container/app/registry.py +++ b/pulp_container/app/registry.py @@ -117,7 +117,16 @@ async def get_tag(self, request): pk__in=await sync_to_async(repository_version.get_content)(), name=tag_name ) except ObjectDoesNotExist: - raise PathNotResolved(tag_name) + if distribution.remote: + repository = await repository_version.repository.acast() + try: + tag = await repository.pending_tags.select_related("tagged_manifest").aget( + name=tag_name + ) + except ObjectDoesNotExist: + raise PathNotResolved(tag_name) + else: + raise PathNotResolved(tag_name) # we do not convert OCI to docker oci_mediatypes = [MEDIA_TYPE.MANIFEST_OCI, MEDIA_TYPE.INDEX_OCI] @@ -155,8 +164,7 @@ async def get_tag(self, request): async def dispatch_tag(self, request, tag, response_headers): """ - Finds an artifact associated with a Tag and sends it to the client, otherwise tries - to stream it. + Finds an artifact associated with a Tag and sends it to the client. Args: request(:class:`~aiohttp.web.Request`): The request to prepare a response for. @@ -169,13 +177,8 @@ async def dispatch_tag(self, request, tag, response_headers): streamed back to the client. """ - try: - artifact = await tag.tagged_manifest._artifacts.aget() - except ObjectDoesNotExist: - ca = await sync_to_async(lambda x: x[0])(tag.tagged_manifest.contentartifact_set.all()) - return await self._stream_content_artifact(request, web.StreamResponse(), ca) - else: - return await Registry._dispatch(artifact, response_headers) + artifact = await sync_to_async(tag.tagged_manifest._artifacts.get)() + return await Registry._dispatch(artifact, response_headers) @staticmethod async def dispatch_converted_schema(tag, accepted_media_types, path): @@ -219,7 +222,6 @@ async def get_by_digest(self, request): """ Return a response to the "GET" action. """ - path = request.match_info["path"] digest = "sha256:{digest}".format(digest=request.match_info["digest"]) distribution = await sync_to_async(self._match_distribution)(path) @@ -233,15 +235,15 @@ async def get_by_digest(self, request): content = await sync_to_async(repository_version.get_content)() repository = await sync_to_async(repository_version.repository.cast)() - if repository.PUSH_ENABLED: - pending_blobs = repository.pending_blobs.values_list("pk") - pending_manifests = repository.pending_manifests.values_list("pk") - pending_content = pending_blobs.union(pending_manifests) - content |= Content.objects.filter(pk__in=pending_content) + pending_blobs = repository.pending_blobs.values_list("pk") + pending_manifests = repository.pending_manifests.values_list("pk") + pending_content = pending_blobs.union(pending_manifests) + content |= Content.objects.filter(pk__in=pending_content) ca = await ContentArtifact.objects.select_related("artifact", "content").aget( content__in=content, relative_path=digest ) + ca_content = await sync_to_async(ca.content.cast)() if isinstance(ca_content, Blob): media_type = BLOB_CONTENT_TYPE diff --git a/pulp_container/app/registry_api.py b/pulp_container/app/registry_api.py index 6bb21a9ac..fb78bd102 100644 --- a/pulp_container/app/registry_api.py +++ b/pulp_container/app/registry_api.py @@ -11,18 +11,20 @@ import hashlib import re +from aiohttp.client_exceptions import ClientResponseError from itertools import chain -from urllib.parse import urlparse, urlunparse, parse_qs, urlencode +from urllib.parse import urljoin, urlparse, urlunparse, parse_qs, urlencode from tempfile import NamedTemporaryFile from django.core.files.storage import default_storage as storage from django.core.files.base import ContentFile, File from django.db import IntegrityError, transaction +from django.db.models import F, Value from django.shortcuts import get_object_or_404 from django.conf import settings -from pulpcore.plugin.models import Artifact, ContentArtifact, UploadChunk +from pulpcore.plugin.models import Artifact, ContentArtifact, Content, UploadChunk from pulpcore.plugin.files import PulpTemporaryUploadedFile from pulpcore.plugin.tasking import add_and_remove, dispatch from pulpcore.plugin.util import get_objects_for_user @@ -81,6 +83,7 @@ SIGNATURE_HEADER, SIGNATURE_PAYLOAD_MAX_SIZE, SIGNATURE_TYPE, + V2_ACCEPT_HEADERS, ) log = logging.getLogger(__name__) @@ -268,7 +271,8 @@ def get_drv_pull(self, path): try: distribution = models.ContainerDistribution.objects.get(base_path=path) except models.ContainerDistribution.DoesNotExist: - raise RepositoryNotFound(name=path) + # get a pull-through cache distribution whose base_path is a substring of path + return self.get_pull_through_drv(path) if distribution.repository: repository_version = distribution.repository.latest_version() elif distribution.repository_version: @@ -277,6 +281,35 @@ def get_drv_pull(self, path): raise RepositoryNotFound(name=path) return distribution, distribution.repository, repository_version + def get_pull_through_drv(self, path): + root_cache_distribution = ( + models.ContainerPullThroughDistribution.objects.annotate(path=Value(path)) + .filter(path__startswith=F("base_path")) + .order_by("-base_path") + .first() + ) + if not root_cache_distribution: + raise RepositoryNotFound(name=path) + + cache_repository, _ = models.ContainerRepository.objects.get_or_create( + name=path, retain_repo_versions=1 + ) + + upstream_name = path.split(root_cache_distribution.base_path, maxsplit=1)[1].strip("/") + cache_remote, _ = models.ContainerRemote.objects.get_or_create( + upstream_name=upstream_name, name=path, url=root_cache_distribution.remote.url + ) + + cache_distribution, _ = models.ContainerDistribution.objects.get_or_create( + base_path=path, + name=path, + repository=cache_repository, + remote=cache_remote, + ) + root_cache_distribution.distributions.add(cache_distribution) + + return cache_distribution, cache_repository, cache_repository.latest_version() + def get_dr_push(self, request, path, create=False): """ Get distribution and repository for push access. @@ -803,10 +836,105 @@ def handle_safe_method(self, request, path, pk): except models.Blob.DoesNotExist: raise BlobNotFound(digest=pk) else: - raise BlobNotFound(digest=pk) + if distribution.remote: + self.pull_blob_from_remote(distribution, request, pk) + if not repository.remaining_blobs.all().exists(): + tags_to_remove = ( + models.Tag.objects.filter( + pk__in=repository.latest_version().content.all(), + name__in=repository.pending_tags.values_list("name"), + ).select_related("tagged_manifest__blobs").exclude( + tagged_manifest__in=repository.pending_tags.values_list( + "tagged_manifest" + ) + ).values_list("pk") + ) + manifests_to_remove = models.Manifest.objects.filter( + pk__in=tags_to_remove + ).select_related("blobs").values_list("pk") + blobs_to_remove = models.Blob.objects.filter( + pk__in=manifests_to_remove.blobs + ).values_list("pk") + remove_content_units = Content.objects.filter( + pk__in=blobs_to_remove.union(manifests_to_remove.union(tags_to_remove)) + ) + + pending_blobs = repository.pending_blobs.values_list("pk") + pending_manifests = repository.pending_manifests.values_list("pk") + pending_tags = repository.pending_tags.values_list("pk") + pending_content = pending_blobs.union(pending_manifests.union(pending_tags)) + add_content_units = Content.objects.filter(pk__in=pending_content) + + immediate_task = dispatch( + add_and_remove, + exclusive_resources=[repository], + kwargs={ + "repository_pk": str(repository.pk), + "add_content_units": add_content_units, + "remove_content_units": remove_content_units, + }, + immediate=True, + deferred=False, + ) + + if immediate_task.state == "completed": + pass + elif immediate_task.state == "canceled": + raise Throttled() + else: + raise Exception(str(immediate_task.error)) + + redirects = FileStorageRedirects(distribution, distribution.base_path, request) + return redirects.issue_pull_through_blobs_redirect(pk) + else: + raise BlobNotFound(digest=pk) return redirects.issue_blob_redirect(blob) + def pull_blob_from_remote(self, distribution, request, pk): + response = download_content(distribution, pk, request) + response.artifact_attributes["file"] = response.path + + digest = f'sha256:{response.artifact_attributes["sha256"]}' + artifact = _save_artifact(response.artifact_attributes) + blob = self._save_blob(artifact, digest) + + repository = distribution.repository.cast() + repository.remaining_blobs.remove(blob) + repository.pending_blobs.add(blob) + + manifests = repository.pending_manifests.exclude( + media_type__in=( + models.MEDIA_TYPE.MANIFEST_LIST, + models.MEDIA_TYPE.INDEX_OCI, + ) + ) + for m in manifests: + m_data = _read_manifest(m) + if m_data["config"]["digest"] == blob.digest: + m.config_blob = blob + m.save() + elif any(blob.digest == layer["digest"] for layer in m_data["layers"]): + models.BlobManifest(manifest=m, manifest_blob=blob) + + return blob + + def _save_blob(self, artifact, digest): + # the blob in question should be already saved after pulling its manifest + blob = models.Blob.objects.get(digest=digest) + blob.touch() + + ca = ContentArtifact(artifact=artifact, content=blob, relative_path=digest) + try: + ca.save() + except IntegrityError: + # re-upload artifact in case it was previously removed. + ca = ContentArtifact.objects.get(content=blob, relative_path=digest) + if not ca.artifact: + ca.artifact = artifact + ca.save(update_fields=["artifact"]) + return blob + class Manifests(RedirectsMixin, ContainerRegistryApiMixin, ViewSet): """ @@ -841,13 +969,28 @@ def handle_safe_method(self, request, path, pk): try: tag = models.Tag.objects.get(name=pk, pk__in=repository_version.content) except models.Tag.DoesNotExist: - raise ManifestNotFound(reference=pk) + if distribution.remote: + manifest = self.pull_manifest_from_remote(distribution, request, pk) + + tag = models.Tag(name=pk, tagged_manifest=manifest) + try: + tag.save() + except IntegrityError: + tag = models.Tag.objects.get(name=tag.name, tagged_manifest=manifest) + tag.touch() + + repository.pending_tags.add(tag) + + redirects = FileStorageRedirects(distribution, distribution.base_path, request) + return redirects.issue_pull_through_manifests_redirect(pk) + else: + raise ManifestNotFound(reference=pk) return redirects.issue_tag_redirect(tag) else: try: manifest = models.Manifest.objects.get(digest=pk, pk__in=repository_version.content) - except models.Manifest.DoesNotExit: + except models.Manifest.DoesNotExist: if repository.PUSH_ENABLED: # the manifest might be a part of listed manifests currently being uploaded try: @@ -856,10 +999,76 @@ def handle_safe_method(self, request, path, pk): except models.Manifest.DoesNotExist: raise ManifestNotFound(reference=pk) else: - ManifestNotFound(reference=pk) + if distribution.remote: + manifest = self.pull_manifest_from_remote(distribution, request, pk) + manifest_lists = repository.pending_manifests.filter( + media_type__in=( + models.MEDIA_TYPE.MANIFEST_LIST, + models.MEDIA_TYPE.INDEX_OCI, + ) + ) + for ml in manifest_lists: + ml_data = _read_manifest(ml) + for m in ml_data["manifests"]: + if m["digest"] == f"sha256:{manifest.digest}": + platform = m["platform"] + models.ManifestListManifest.objects.get_or_create( + manifest_list=manifest, + image_manifest=ml, + architecture=platform["architecture"], + os=platform["os"], + features=platform.get("features", ""), + variant=platform.get("variant", ""), + os_version=platform.get("os.version", ""), + os_features=platform.get("os.features", ""), + ) + break + + redirects = FileStorageRedirects( + distribution, distribution.base_path, request + ) + return redirects.issue_pull_through_manifests_redirect(pk) + else: + ManifestNotFound(reference=pk) return redirects.issue_manifest_redirect(manifest) + def pull_manifest_from_remote(self, distribution, request, pk): + response = download_content(distribution, pk, request) + + with open(response.path, "rb") as content_file: + raw_data = content_file.read() + response.artifact_attributes["file"] = response.path + + manifest_data = json.loads(raw_data) + digest = f'sha256:{response.artifact_attributes["sha256"]}' + media_type = determine_media_type(manifest_data, response) + + artifact = _save_artifact(response.artifact_attributes) + manifest = self._save_manifest(artifact, digest, media_type) + + repository = distribution.repository.cast() + content = repository.latest_version().get_content() + if not content.filter(pk=manifest.pk).exists(): + repository.pending_manifests.add(manifest) + manifest.touch() + + if media_type not in ( + models.MEDIA_TYPE.MANIFEST_LIST, + models.MEDIA_TYPE.INDEX_OCI, + ): + blob_digests = [layer["digest"] for layer in manifest_data["layers"]] + blob_digests.append(manifest_data["config"]["digest"]) + for d in blob_digests: + blob = models.Blob(digest=d) + try: + blob.save() + except IntegrityError: + blob = models.Blob.objects.get(digest=d) + repository.remaining_blobs.add(blob) + + return manifest + def put(self, request, path, pk=None): """ Responds with the actual manifest @@ -1102,7 +1311,13 @@ def get(self, request, path, pk): try: manifest = models.Manifest.objects.get(digest=pk, pk__in=repository_version.content) except models.Manifest.DoesNotExist: - raise ManifestNotFound(reference=pk) + try: + manifest = models.Manifest.objects.get( + digest=pk, + pk__in=repository_version.repository.pending_manifests.values_list("pk"), + ) + except models.Manifest.DoesNotExist: + raise ManifestNotFound(reference=pk) signatures = models.ManifestSignature.objects.filter( signed_manifest=manifest, pk__in=repository_version.content @@ -1191,3 +1406,37 @@ def put(self, request, path, pk): return ManifestSignatureResponse(signature, path) else: raise Exception(str(immediate_task.error)) + + +def download_content(distribution, pk, request): + remote = distribution.remote.cast() + content_type = request.path.rsplit("/", maxsplit=2)[-2] + + relative_url = f"/v2/{remote.namespaced_upstream_name}/{content_type}/{pk}" + url = urljoin(remote.url, relative_url) + + downloader = remote.get_downloader(url=url) + try: + response = downloader.fetch(extra_data={"headers": V2_ACCEPT_HEADERS}) + except ClientResponseError: + raise ManifestNotFound(reference=pk) + return response + + +def _save_artifact(artifact_attributes): + saved_artifact = Artifact(**artifact_attributes) + try: + saved_artifact.save() + except IntegrityError: + del artifact_attributes["file"] + saved_artifact = Artifact.objects.get(**artifact_attributes) + saved_artifact.touch() + return saved_artifact + + +def _read_manifest(manifest): + artifact = manifest._artifacts.get() + raw_data = artifact.file.read() + manifest_data = json.loads(raw_data) + artifact.file.close() + return manifest_data diff --git a/pulp_container/app/serializers.py b/pulp_container/app/serializers.py index 87393a713..64a1b3bba 100644 --- a/pulp_container/app/serializers.py +++ b/pulp_container/app/serializers.py @@ -277,6 +277,22 @@ class Meta: model = models.ContainerRemote +class ContainerPullThroughRemoteSerializer(RemoteSerializer): + """ + TODO: Customize help messages for url, credentials. + """ + + policy = serializers.ChoiceField( + help_text="The policy always mimics the on_demand behaviour when performing pull-through.", + choices=((models.Remote.ON_DEMAND, "When syncing, download just the metadata.")), + default=models.Remote.ON_DEMAND, + ) + + class Meta: + fields = RemoteSerializer.Meta.fields + model = models.ContainerPullThroughRemote + + class ContainerDistributionSerializer(DistributionSerializer): """ A serializer for ContainerDistribution. @@ -309,10 +325,16 @@ class ContainerDistributionSerializer(DistributionSerializer): repository_version = RepositoryVersionRelatedField( required=False, help_text=_("RepositoryVersion to be served"), allow_null=True ) + remote = DetailRelatedField( + required=False, + help_text=_("Remote that can be used to fetch content when using pull-through caching."), + view_name_pattern=r"remotes(-.*/.*)?-detail", + queryset=models.ContainerRemote.objects.all(), + ) def validate(self, data): """ - Validate the ContainterDistribution. + Validate the ContainerDistribution. Make sure there is an instance of ContentRedirectContentGuard always present in validated data. @@ -360,12 +382,36 @@ class Meta: fields = tuple(set(DistributionSerializer.Meta.fields) - {"base_url"}) + ( "repository_version", "registry_path", + "remote", "namespace", "private", "description", ) +class ContainerPullThroughDistributionSerializer(DistributionSerializer): + """ + TODO: Customize help messages for base paths, names (e.g., dockerhub-cache). + """ + + remote = DetailRelatedField( + help_text=_("Remote that can be used to fetch content when using pull-through caching."), + view_name_pattern=r"remotes(-.*/.*)-detail", + queryset=models.ContainerPullThroughRemote.objects.all(), + ) + distributions = DetailRelatedField( + many=True, + help_text="Distributions created after pulling content through cache", + view_name="distributions-detail", + queryset=models.ContainerDistribution.objects.all(), + required=False, + ) + + class Meta: + model = models.ContainerPullThroughDistribution + fields = DistributionSerializer.Meta.fields + ("remote", "distributions") + + class TagOperationSerializer(ValidateFieldsMixin, serializers.Serializer): """ A base serializer for tagging and untagging manifests. diff --git a/pulp_container/app/tasks/sync_stages.py b/pulp_container/app/tasks/sync_stages.py index 96b233c89..7a340d6ef 100644 --- a/pulp_container/app/tasks/sync_stages.py +++ b/pulp_container/app/tasks/sync_stages.py @@ -20,6 +20,7 @@ SIGNATURE_HEADER, SIGNATURE_SOURCE, SIGNATURE_TYPE, + V2_ACCEPT_HEADERS, ) from pulp_container.app.models import ( Blob, @@ -34,6 +35,7 @@ urlpath_sanitize, determine_media_type, validate_manifest, + calculate_digest, ) log = logging.getLogger(__name__) @@ -382,7 +384,7 @@ def create_tagged_manifest_list(self, tag_name, saved_artifact, manifest_list_da tag_name (str): A name of a tag saved_artifact (pulpcore.plugin.models.Artifact): A saved manifest's Artifact manifest_list_data (dict): Data about a ManifestList - media_type (str): The type of a manifest + media_type (str): The type of manifest """ digest = f"sha256:{saved_artifact.sha256}" @@ -411,7 +413,7 @@ def create_tagged_manifest(self, tag_name, saved_artifact, manifest_data, raw_da if media_type in (MEDIA_TYPE.MANIFEST_V2, MEDIA_TYPE.MANIFEST_OCI): digest = f"sha256:{saved_artifact.sha256}" else: - digest = self._calculate_digest(raw_data) + digest = calculate_digest(raw_data) manifest = Manifest( digest=digest, schema_version=manifest_data["schemaVersion"], media_type=media_type @@ -649,77 +651,6 @@ def _include_layer(self, layer): return False return True - def _calculate_digest(self, manifest): - """ - Calculate the requested digest of the ImageManifest, given in JSON. - - Args: - manifest (str): The raw JSON representation of the Manifest. - - Returns: - str: The digest of the given ImageManifest - - """ - decoded_manifest = json.loads(manifest) - if "signatures" in decoded_manifest: - # This manifest contains signatures. Unfortunately, the Docker manifest digest - # is calculated on the unsigned version of the Manifest so we need to remove the - # signatures. To do this, we will look at the 'protected' key within the first - # signature. This key indexes a (malformed) base64 encoded JSON dictionary that - # tells us how many bytes of the manifest we need to keep before the signature - # appears in the original JSON and what the original ending to the manifest was after - # the signature block. We will strip out the bytes after this cutoff point, add back the - # original ending, and then calculate the sha256 sum of the transformed JSON to get the - # digest. - protected = decoded_manifest["signatures"][0]["protected"] - # Add back the missing padding to the protected block so that it is valid base64. - protected = self._pad_unpadded_b64(protected) - # Now let's decode the base64 and load it as a dictionary so we can get the length - protected = base64.b64decode(protected) - protected = json.loads(protected) - # This is the length of the signed portion of the Manifest, except for a trailing - # newline and closing curly brace. - signed_length = protected["formatLength"] - # The formatTail key indexes a base64 encoded string that represents the end of the - # original Manifest before signatures. We will need to add this string back to the - # trimmed Manifest to get the correct digest. We'll do this as a one liner since it is - # a very similar process to what we've just done above to get the protected block - # decoded. - signed_tail = base64.b64decode(self._pad_unpadded_b64(protected["formatTail"])) - # Now we can reconstruct the original Manifest that the digest should be based on. - manifest = manifest[:signed_length] + signed_tail - - return "sha256:{digest}".format(digest=hashlib.sha256(manifest).hexdigest()) - - def _pad_unpadded_b64(self, unpadded_b64): - """ - Fix bad padding. - - Docker has not included the required padding at the end of the base64 encoded - 'protected' block, or in some encased base64 within it. This function adds the correct - number of ='s signs to the unpadded base64 text so that it can be decoded with Python's - base64 library. - - Args: - unpadded_b64 (str): The unpadded base64 text. - - Returns: - str: The same base64 text with the appropriate number of ='s symbols. - - """ - # The Pulp team has not observed any newlines or spaces within the base64 from Docker, but - # Docker's own code does this same operation so it seemed prudent to include it here. - # See lines 167 to 168 here: - # https://github.com/docker/libtrust/blob/9cbd2a1374f46905c68a4eb3694a130610adc62a/util.go - unpadded_b64 = unpadded_b64.replace("\n", "").replace(" ", "") - # It is illegal base64 for the remainder to be 1 when the length of the block is - # divided by 4. - if len(unpadded_b64) % 4 == 1: - raise ValueError("Invalid base64: {t}".format(t=unpadded_b64)) - # Add back the missing padding characters, based on the length of the encoded string - paddings = {0: "", 2: "==", 3: "="} - return unpadded_b64 + paddings[len(unpadded_b64) % 4] - class ContainerContentSaver(ContentSaver): """Container specific content saver stage to add content associations.""" diff --git a/pulp_container/app/utils.py b/pulp_container/app/utils.py index fe6c6e71a..aa1711331 100644 --- a/pulp_container/app/utils.py +++ b/pulp_container/app/utils.py @@ -1,3 +1,5 @@ +import base64 +import hashlib import re import subprocess import gnupg @@ -213,3 +215,76 @@ def validate_manifest(content_data, media_type, digest): raise ManifestInvalid( reason=f'{".".join(map(str, error.path))}: {error.message}', digest=digest ) + + +def calculate_digest(manifest): + """ + Calculate the requested digest of the ImageManifest, given in JSON. + + Args: + manifest (str): The raw JSON representation of the Manifest. + + Returns: + str: The digest of the given ImageManifest + + """ + decoded_manifest = json.loads(manifest) + if "signatures" in decoded_manifest: + # This manifest contains signatures. Unfortunately, the Docker manifest digest + # is calculated on the unsigned version of the Manifest so we need to remove the + # signatures. To do this, we will look at the 'protected' key within the first + # signature. This key indexes a (malformed) base64 encoded JSON dictionary that + # tells us how many bytes of the manifest we need to keep before the signature + # appears in the original JSON and what the original ending to the manifest was after + # the signature block. We will strip out the bytes after this cutoff point, add back the + # original ending, and then calculate the sha256 sum of the transformed JSON to get the + # digest. + protected = decoded_manifest["signatures"][0]["protected"] + # Add back the missing padding to the protected block so that it is valid base64. + protected = pad_unpadded_b64(protected) + # Now let's decode the base64 and load it as a dictionary so we can get the length + protected = base64.b64decode(protected) + protected = json.loads(protected) + # This is the length of the signed portion of the Manifest, except for a trailing + # newline and closing curly brace. + signed_length = protected["formatLength"] + # The formatTail key indexes a base64 encoded string that represents the end of the + # original Manifest before signatures. We will need to add this string back to the + # trimmed Manifest to get the correct digest. We'll do this as a one liner since it is + # a very similar process to what we've just done above to get the protected block + # decoded. + signed_tail = base64.b64decode(pad_unpadded_b64(protected["formatTail"])) + # Now we can reconstruct the original Manifest that the digest should be based on. + manifest = manifest[:signed_length] + signed_tail + + return "sha256:{digest}".format(digest=hashlib.sha256(manifest).hexdigest()) + + +def pad_unpadded_b64(unpadded_b64): + """ + Fix bad padding. + + Docker has not included the required padding at the end of the base64 encoded + 'protected' block, or in some encased base64 within it. This function adds the correct + number of ='s signs to the unpadded base64 text so that it can be decoded with Python's + base64 library. + + Args: + unpadded_b64 (str): The unpadded base64 text. + + Returns: + str: The same base64 text with the appropriate number of ='s symbols. + + """ + # The Pulp team has not observed any newlines or spaces within the base64 from Docker, but + # Docker's own code does this same operation so it seemed prudent to include it here. + # See lines 167 to 168 here: + # https://github.com/docker/libtrust/blob/9cbd2a1374f46905c68a4eb3694a130610adc62a/util.go + unpadded_b64 = unpadded_b64.replace("\n", "").replace(" ", "") + # It is illegal base64 for the remainder to be 1 when the length of the block is + # divided by 4. + if len(unpadded_b64) % 4 == 1: + raise ValueError("Invalid base64: {t}".format(t=unpadded_b64)) + # Add back the missing padding characters, based on the length of the encoded string + paddings = {0: "", 2: "==", 3: "="} + return unpadded_b64 + paddings[len(unpadded_b64) % 4] diff --git a/pulp_container/app/viewsets.py b/pulp_container/app/viewsets.py index 7dc2b1155..4536235f6 100644 --- a/pulp_container/app/viewsets.py +++ b/pulp_container/app/viewsets.py @@ -429,6 +429,16 @@ class ContainerRemoteViewSet(RemoteViewSet, RolesMixin): } +class ContainerPullThroughRemoteViewSet(RemoteViewSet, RolesMixin): + """ + TODO: Add permissions. + """ + + endpoint_name = "pull-through" + queryset = models.ContainerPullThroughRemote.objects.all() + serializer_class = serializers.ContainerPullThroughRemoteSerializer + + class TagOperationsMixin: """ A mixin that adds functionality for creating and deleting tags. @@ -1302,6 +1312,16 @@ def destroy(self, request, pk, **kwargs): return OperationPostponedResponse(async_result, request) +class ContainerPullThroughDistributionViewSet(DistributionViewSet, RolesMixin): + """ + TODO: Add permissions. + """ + + endpoint_name = "pull-through" + queryset = models.ContainerPullThroughDistribution.objects.all() + serializer_class = serializers.ContainerPullThroughDistributionSerializer + + class ContainerNamespaceViewSet( NamedModelViewSet, mixins.CreateModelMixin, diff --git a/pulp_container/tests/functional/api/test_pull_through_cache.py b/pulp_container/tests/functional/api/test_pull_through_cache.py new file mode 100644 index 000000000..82e91981e --- /dev/null +++ b/pulp_container/tests/functional/api/test_pull_through_cache.py @@ -0,0 +1,33 @@ +from uuid import uuid4 + + +def consume_content( + delete_orphans_pre, + gen_object_with_cleanup, + container_pull_through_remote_api, + container_pull_through_distribution_api, + registry_client, + local_registry, + container_repository_api, + container_remote_api, + container_distribution_api, +): + data = {"name": str(uuid4()), "url": "https://registry-1.docker.io"} + remote = gen_object_with_cleanup(container_pull_through_remote_api, data) + + data = {"name": str(uuid4()), "base_path": str(uuid4()), "remote": remote.pulp_href} + distribution = gen_object_with_cleanup(container_pull_through_distribution_api, data) + + remote_image_path = "library/busybox" + + registry_client.pull(f"docker.io/{remote_image_path}") + remote_image = registry_client.inspect(f"docker.io/{remote_image_path}") + + local_registry.pull(f"{distribution.base_path}/{remote_image_path}") + local_image = local_registry.inspect(f"{distribution.base_path}/{remote_image_path}") + + assert local_image[0]["Id"] == remote_image[0]["Id"] + + container_repository_api.read(name=remote_image_path) + container_remote_api.read(name=remote_image_path) + container_distribution_api.read(name=remote_image_path) diff --git a/pulp_container/tests/functional/conftest.py b/pulp_container/tests/functional/conftest.py index c552533dc..2c447cb55 100644 --- a/pulp_container/tests/functional/conftest.py +++ b/pulp_container/tests/functional/conftest.py @@ -13,11 +13,13 @@ ApiClient, PulpContainerNamespacesApi, RemotesContainerApi, + RemotesPullThroughApi, RepositoriesContainerApi, RepositoriesContainerPushApi, RepositoriesContainerVersionsApi, RepositoriesContainerPushVersionsApi, DistributionsContainerApi, + DistributionsPullThroughApi, ContentTagsApi, ContentManifestsApi, ContentBlobsApi, @@ -323,6 +325,12 @@ def container_remote_api(container_client): return RemotesContainerApi(container_client) +@pytest.fixture(scope="session") +def container_pull_through_remote_api(container_client): + """Pull through cache container remote API fixture.""" + return RemotesPullThroughApi(container_client) + + @pytest.fixture(scope="session") def container_repository_api(container_client): """Container repository API fixture.""" @@ -353,6 +361,12 @@ def container_distribution_api(container_client): return DistributionsContainerApi(container_client) +@pytest.fixture(scope="session") +def container_pull_through_distribution_api(container_client): + """Pull through cache distribution API Fixture.""" + return DistributionsPullThroughApi(container_client) + + @pytest.fixture(scope="session") def container_tag_api(container_client): """Container tag API fixture.""" diff --git a/requirements.txt b/requirements.txt index f585185f7..e4a701a05 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ ecdsa>=0.14,<=0.18.0 jsonschema>=4.4,<4.19 -pulpcore>=3.25.0,<3.40 +pulpcore>=3.27.0,<3.40 pyjwkest>=1.4,<=1.4.2 pyjwt[crypto]>=2.4,<2.9