diff --git a/CHANGES/507.feature b/CHANGES/507.feature index df2a3bbd8..037ccbb01 100644 --- a/CHANGES/507.feature +++ b/CHANGES/507.feature @@ -1,3 +1,3 @@ Added support for pull-through caching. Users can now configure a dedicated distribution and remote -linked to an external registry without specifying a repository name (upstream name). Pulp downloads -missing content automatically if requested and acts as a caching proxy. +linked to an external registry without the need to create and mirror repositories in advance. Pulp +downloads missing content automatically if requested and acts as a caching proxy. diff --git a/docs/workflows/host.rst b/docs/workflows/host.rst index c8225fffe..185cb4014 100644 --- a/docs/workflows/host.rst +++ b/docs/workflows/host.rst @@ -123,32 +123,35 @@ Pull-Through Caching -------------------- The Pull-Through Caching feature offers an alternative way to host content by leveraging a **remote -registry** as the source of truth. This eliminates the need for repository synchronization, reducing -storage overhead, and ensuring up-to-date images. Pulp acts as a **caching proxy** and stores images -in a local repository. +registry** as the source of truth. This eliminates the need for in-advance repository +synchronization because Pulp acts as a **caching proxy** and stores images, after they have been +pulled by an end client, in a local repository. -Administering the caching:: +Configuring the caching:: # initialize a pull-through remote (the concept of upstream-name is not applicable here) REMOTE_HREF=$(http ${BASE_ADDR}/pulp/api/v3/remotes/container/pull-through/ name=docker-cache url=https://registry-1.docker.io | jq -r ".pulp_href") - # create a specialized distribution linked to the initialized remote + # create a pull-through distribution linked to the initialized remote http ${BASE_ADDR}/pulp/api/v3/distributions/container/pull-through/ remote=${REMOTE_HREF} name=docker-cache base_path=docker-cache -Downloading content:: +Pulling content:: podman pull localhost:24817/docker-cache/library/busybox -In the example above, the image "busybox" is pulled from the "docker-cache" distribution, acting as -a transparent caching layer. +In the example above, the image "busybox" is pulled from *DockerHub* through the "docker-cache" +distribution, acting as a transparent caching layer. -By incorporating the Pull-Through Caching feature, administrators can **reduce external network -dependencies**, and ensure a more reliable and responsive container deployment system in production -environments. +By incorporating the Pull-Through Caching feature into standard workflows, users **do not need** to +pre-configure a new repository and sync it to facilitate the retrieval of the actual content. This +speeds up the whole process of shipping containers from its early management stages to distribution. +Similarly to on-demand syncing, the feature also **reduces external network dependencies**, and +ensures a more reliable container deployment system in production environments. .. note:: - Pulp creates repositories that maintain a single repository version for user-pulled images. + Pulp creates repositories that maintain a single repository version for pulled images. Thus, only the latest repository version is retained. For instance, when pulling "debian:10," - a "debian" repository with the "10" tag is established. Subsequent pulls such as "debian:11" + a "debian" repository with the "10" tag is created. Subsequent pulls such as "debian:11" result in a new repository version that incorporates both tags while removing the previous - version. Repositories and their content remain manageable through standard API endpoints. + version. Repositories and their content remain manageable through standard Pulp API endpoints. + With that, no content can be pushed to these repositories. diff --git a/pulp_container/app/content.py b/pulp_container/app/content.py index fae311e4e..fcdc7786f 100644 --- a/pulp_container/app/content.py +++ b/pulp_container/app/content.py @@ -6,9 +6,11 @@ registry = Registry() app.add_routes( - [web.get(r"/pulp/container/{path:.+}/blobs/sha256:{digest:.+}", registry.get_by_digest)] -) -app.add_routes( - [web.get(r"/pulp/container/{path:.+}/manifests/sha256:{digest:.+}", registry.get_by_digest)] + [ + web.get( + r"/pulp/container/{path:.+}/{content:(blobs|manifests)}/sha256:{digest:.+}", + registry.get_by_digest, + ) + ] ) app.add_routes([web.get(r"/pulp/container/{path:.+}/manifests/{tag_name}", registry.get_tag)]) diff --git a/pulp_container/app/downloaders.py b/pulp_container/app/downloaders.py index f6c81fbef..bcde18e5e 100644 --- a/pulp_container/app/downloaders.py +++ b/pulp_container/app/downloaders.py @@ -16,7 +16,10 @@ log = getLogger(__name__) -InMemoryDownloadResult = namedtuple("InMemoryDownloadResult", ["data", "headers", "status_code"]) +HeadResult = namedtuple( + "HeadResult", + ["status_code", "path", "artifact_attributes", "url", "headers"], +) class RegistryAuthHttpDownloader(HttpDownloader): @@ -27,13 +30,13 @@ class RegistryAuthHttpDownloader(HttpDownloader): """ registry_auth = {"bearer": None, "basic": None} + token_lock = asyncio.Lock() def __init__(self, *args, **kwargs): """ Initialize the downloader. """ self.remote = kwargs.pop("remote") - self.token_lock = asyncio.Lock() super().__init__(*args, **kwargs) @@ -99,7 +102,12 @@ async def _run(self, handle_401=True, extra_data=None): return await self._run(handle_401=False, extra_data=extra_data) else: raise - to_return = await self._handle_response(response) + + if http_method == "head": + to_return = await self._handle_head_response(response) + else: + to_return = await self._handle_response(response) + await response.release() self.response_headers = response.headers @@ -177,14 +185,13 @@ def auth_header(token, basic_auth): return {"Authorization": basic_auth} return {} - -class InMemoryDownloader(RegistryAuthHttpDownloader): - """A downloader class suited for downloading data in-memory.""" - - async def _handle_response(self, response): - data = await response.text() - return InMemoryDownloadResult( - data=data, headers=response.headers, status_code=response.status + async def _handle_head_response(self, response): + return HeadResult( + status_code=response.status, + path=None, + artifact_attributes=None, + url=self.url, + headers=response.headers, ) diff --git a/pulp_container/app/migrations/0037_create_pull_through_cache_models.py b/pulp_container/app/migrations/0037_create_pull_through_cache_models.py index b247a6cfa..2ed4ff8e1 100644 --- a/pulp_container/app/migrations/0037_create_pull_through_cache_models.py +++ b/pulp_container/app/migrations/0037_create_pull_through_cache_models.py @@ -1,4 +1,4 @@ -# Generated by Django 4.2.6 on 2023-10-25 20:04 +# Generated by Django 4.2.8 on 2023-12-12 21:15 from django.db import migrations, models import django.db.models.deletion @@ -8,32 +8,45 @@ class Migration(migrations.Migration): dependencies = [ - ('core', '0108_task_versions'), + ('core', '0116_alter_remoteartifact_md5_alter_remoteartifact_sha1_and_more'), ('container', '0036_containerpushrepository_pending_blobs_manifests'), ] operations = [ migrations.CreateModel( - name='ContainerPullThroughDistribution', + name='ContainerPullThroughRemote', fields=[ - ('distribution_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='core.distribution')), + ('remote_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='core.remote')), ], options={ - 'permissions': [('manage_roles_containerpullthroughdistribution', 'Can manage role assignments on pull-through cache distribution')], + 'permissions': [('manage_roles_containerpullthroughremote', 'Can manage role assignments on pull-through container remote')], 'default_related_name': '%(app_label)s_%(model_name)s', }, - bases=('core.distribution', pulpcore.app.models.access_policy.AutoAddObjPermsMixin), + bases=('core.remote', pulpcore.app.models.access_policy.AutoAddObjPermsMixin), + ), + migrations.AddField( + model_name='containerrepository', + name='pending_blobs', + field=models.ManyToManyField(to='container.blob'), + ), + migrations.AddField( + model_name='containerrepository', + name='pending_manifests', + field=models.ManyToManyField(to='container.manifest'), ), migrations.CreateModel( - name='ContainerPullThroughRemote', + name='ContainerPullThroughDistribution', fields=[ - ('remote_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='core.remote')), + ('distribution_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='core.distribution')), + ('private', models.BooleanField(default=False, help_text='Restrict pull access to explicitly authorized users. Related distributions inherit this value. Defaults to unrestricted pull access.')), + ('description', models.TextField(null=True)), + ('namespace', models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, related_name='container_pull_through_distributions', to='container.containernamespace')), ], options={ - 'permissions': [('manage_roles_containerpullthroughremote', 'Can manage role assignments on pull-through container remote')], + 'permissions': [('manage_roles_containerpullthroughdistribution', 'Can manage role assignments on pull-through cache distribution')], 'default_related_name': '%(app_label)s_%(model_name)s', }, - bases=('core.remote', pulpcore.app.models.access_policy.AutoAddObjPermsMixin), + bases=('core.distribution', pulpcore.app.models.access_policy.AutoAddObjPermsMixin), ), migrations.AddField( model_name='containerdistribution', diff --git a/pulp_container/app/models.py b/pulp_container/app/models.py index dea2db436..e21ede94c 100644 --- a/pulp_container/app/models.py +++ b/pulp_container/app/models.py @@ -334,33 +334,6 @@ def noauth_download_factory(self): ) return self._noauth_download_factory - @property - def in_memory_download_factory(self): - """ - A Downloader Factory that stores downloaded data in-memory. - - This downloader should be used in workflows where the size of downloaded content is - reasonably small. For instance, for downloading manifests or manifest lists. - - Upon first access, the InMemoryDownloaderFactory is instantiated and saved internally. - - Returns: - DownloadFactory: The instantiated InMemoryDownloaderFactory to be used by - get_in_memory_downloader(). - - """ - try: - return self._in_memory_download_factory - except AttributeError: - self._in_memory_download_factory = DownloaderFactory( - self, - downloader_overrides={ - "http": downloaders.InMemoryDownloader, - "https": downloaders.InMemoryDownloader, - }, - ) - return self._in_memory_download_factory - def get_downloader(self, remote_artifact=None, url=None, **kwargs): """ Get a downloader from either a RemoteArtifact or URL that is configured with this Remote. @@ -415,36 +388,6 @@ def get_noauth_downloader(self, remote_artifact=None, url=None, **kwargs): **kwargs, ) - def get_in_memory_downloader(self, remote_artifact=None, url=None, **kwargs): - """ - Get an in-memory downloader from either a RemoteArtifact or URL that is provided. - - This method accepts either `remote_artifact` or `url` but not both. At least one is - required. If neither of both are passed a ValueError is raised. - - Args: - remote_artifact (:class:`~pulpcore.app.models.RemoteArtifact`): The RemoteArtifact to - download. - url (str): The URL to download. - kwargs (dict): This accepts the parameters of - :class:`~pulpcore.plugin.download.BaseDownloader`. - - Raises: - ValueError: If neither remote_artifact and url are passed, or if both are passed. - - Returns: - subclass of :class:`~pulpcore.plugin.download.BaseDownloader`: A downloader that - is configured with the remote settings. - - """ - kwargs["remote"] = self - return super().get_downloader( - remote_artifact=remote_artifact, - url=url, - download_factory=self.in_memory_download_factory, - **kwargs, - ) - @property def namespaced_upstream_name(self): """ @@ -473,58 +416,11 @@ class Meta: class ContainerPullThroughRemote(Remote, AutoAddObjPermsMixin): """ A remote for pull-through caching, omitting the requirement for the upstream name. - """ - - TYPE = "pull-through" - - @property - def download_factory(self): - """ - Downloader Factory that maps to custom downloaders which support registry auth. - - Upon first access, the DownloaderFactory is instantiated and saved internally. - - Returns: - DownloadFactory: The instantiated DownloaderFactory to be used by - get_downloader() - - """ - try: - return self._download_factory - except AttributeError: - self._download_factory = DownloaderFactory( - self, - downloader_overrides={ - "http": downloaders.RegistryAuthHttpDownloader, - "https": downloaders.RegistryAuthHttpDownloader, - }, - ) - return self._download_factory - - def get_downloader(self, remote_artifact=None, url=None, **kwargs): - """ - Get a downloader from either a RemoteArtifact or URL that is configured with this Remote. - - This method accepts either `remote_artifact` or `url` but not both. At least one is - required. If neither or both are passed a ValueError is raised. - - Args: - remote_artifact (:class:`~pulpcore.app.models.RemoteArtifact`): The RemoteArtifact to - download. - url (str): The URL to download. - kwargs (dict): This accepts the parameters of - :class:`~pulpcore.plugin.download.BaseDownloader`. - - Raises: - ValueError: If neither remote_artifact and url are passed, or if both are passed. - - Returns: - subclass of :class:`~pulpcore.plugin.download.BaseDownloader`: A downloader that - is configured with the remote settings. - """ - kwargs["remote"] = self - return super().get_downloader(remote_artifact=remote_artifact, url=url, **kwargs) + This remote is used for instantiating new regular container remotes with the upstream name. + Configuring credentials and everything related to container workflows can be therefore done + from within a single instance of this remote. + """ class Meta: default_related_name = "%(app_label)s_%(model_name)s" @@ -609,6 +505,8 @@ class ContainerRepository( manifest_signing_service = models.ForeignKey( ManifestSigningService, on_delete=models.SET_NULL, null=True ) + pending_blobs = models.ManyToManyField(Blob) + pending_manifests = models.ManyToManyField(Manifest) class Meta: default_related_name = "%(app_label)s_%(model_name)s" @@ -632,6 +530,15 @@ def finalize_new_version(self, new_version): """ remove_duplicates(new_version) validate_repo_version(new_version) + self.remove_pending_content(new_version) + + def remove_pending_content(self, repository_version): + """Remove pending blobs and manifests when committing the content to the repository.""" + added_content = repository_version.added( + base_version=repository_version.base_version + ).values_list("pk") + self.pending_blobs.remove(*Blob.objects.filter(pk__in=added_content)) + self.pending_manifests.remove(*Manifest.objects.filter(pk__in=added_content)) class ContainerPushRepository(Repository, AutoAddObjPermsMixin): @@ -695,6 +602,22 @@ class ContainerPullThroughDistribution(Distribution, AutoAddObjPermsMixin): TYPE = "pull-through" + namespace = models.ForeignKey( + ContainerNamespace, + on_delete=models.CASCADE, + related_name="container_pull_through_distributions", + null=True, + ) + private = models.BooleanField( + default=False, + help_text=_( + "Restrict pull access to explicitly authorized users. " + "Related distributions inherit this value. " + "Defaults to unrestricted pull access." + ), + ) + description = models.TextField(null=True) + class Meta: default_related_name = "%(app_label)s_%(model_name)s" permissions = [ diff --git a/pulp_container/app/registry.py b/pulp_container/app/registry.py index e8b7d7bbc..0e229a939 100644 --- a/pulp_container/app/registry.py +++ b/pulp_container/app/registry.py @@ -1,43 +1,42 @@ -import time import json import logging import os from asgiref.sync import sync_to_async +from contextlib import suppress from urllib.parse import urljoin from aiohttp import web +from aiohttp.client_exceptions import ClientResponseError from aiohttp.web_exceptions import HTTPTooManyRequests from django_guid import set_guid from django_guid.utils import generate_guid from django.conf import settings from django.core.exceptions import ObjectDoesNotExist +from django.db import IntegrityError from multidict import MultiDict from pulpcore.plugin.content import Handler, PathNotResolved -from pulpcore.plugin.models import Content, ContentArtifact, Task +from pulpcore.plugin.models import RemoteArtifact, Content, ContentArtifact from pulpcore.plugin.content import ArtifactResponse from pulpcore.plugin.tasking import dispatch from pulp_container.app.cache import RegistryContentCache -from pulp_container.app.models import ContainerDistribution, Tag, Blob +from pulp_container.app.models import ContainerDistribution, Tag, Blob, Manifest, BlobManifest from pulp_container.app.schema_convert import Schema2toSchema1ConverterWrapper from pulp_container.app.tasks import download_image_data from pulp_container.app.utils import ( calculate_digest, get_accepted_media_types, determine_media_type, + save_artifact, ) from pulp_container.constants import BLOB_CONTENT_TYPE, EMPTY_BLOB, MEDIA_TYPE, V2_ACCEPT_HEADERS log = logging.getLogger(__name__) -v2_headers = MultiDict() -v2_headers["Docker-Distribution-API-Version"] = "registry/2.0" - - class Registry(Handler): """ A set of handlers for the Container Registry v2 API. @@ -123,74 +122,61 @@ async def get_tag(self, request): repository_version = await sync_to_async(distribution.get_repository_version)() if not repository_version: raise PathNotResolved(tag_name) - accepted_media_types = get_accepted_media_types(request.headers) try: tag = await Tag.objects.select_related("tagged_manifest").aget( pk__in=await sync_to_async(repository_version.get_content)(), name=tag_name ) except ObjectDoesNotExist: - if distribution.remote: - remote = await distribution.remote.acast() - - relative_url = "/v2/{name}/manifests/{tag}".format( - name=remote.namespaced_upstream_name, tag=tag_name + distribution = await distribution.acast() + if distribution.remote_id and distribution.pull_through_distribution_id: + pull_downloader = await PullThroughDownloader.create( + distribution, repository_version, path, tag_name ) - tag_url = urljoin(remote.url, relative_url) - downloader = remote.get_in_memory_downloader(url=tag_url) - response = await downloader.run(extra_data={"headers": V2_ACCEPT_HEADERS}) - - set_guid(generate_guid()) - task = await sync_to_async(dispatch)( - download_image_data, - exclusive_resources=[repository_version.repository], - kwargs={ - "repository_pk": repository_version.repository.pk, - "remote_pk": remote.pk, - "tag_name": tag_name, - "response_data": response.data, - }, + raw_manifest, digest, media_type = await pull_downloader.download_manifest( + run_pipeline=True ) - - # waiting shortly for the task to be completed since a container client could - # request related content (i.e., manifests and blobs) and halt the pull operation - # if the content was not immediately served - for dummy in range(3): - time.sleep(1) - task = await Task.objects.aget(pk=task.pk) - if task.state == "completed": - await task.adelete() - break - elif task.state in ["waiting", "running"]: - continue - else: - error = task.error - await task.adelete() - raise Exception(str(error)) - else: - raise HTTPTooManyRequests() - - try: - manifest_data = json.loads(response.data) - except json.decoder.JSONDecodeError: - raise PathNotResolved(tag_name) - else: - encoded_data = response.data.encode("utf-8") - digest = calculate_digest(encoded_data) - media_type = determine_media_type(manifest_data, response) - - response_headers = { + headers = { "Content-Type": media_type, "Docker-Content-Digest": digest, + "Docker-Distribution-API-Version": "registry/2.0", } - - # at this time, the manifest artifact was already established, and we can return it - # as it is; meanwhile, the dispatched task has created Manifest/Blob objects and - # relations between them; the said content units are streamed/downloaded on demand - # to a client on a next run - return web.Response(text=response.data, headers=response_headers) + return web.Response(text=raw_manifest, headers=headers) else: raise PathNotResolved(tag_name) + else: + if distribution.remote_id and distribution.pull_through_distribution_id: + # check if the content was updated on the remote and stream it back + remote = await distribution.remote.acast() + relative_url = "/v2/{name}/manifests/{tag}".format( + name=remote.namespaced_upstream_name, tag=tag_name + ) + tag_url = urljoin(remote.url, relative_url) + downloader = remote.get_downloader(url=tag_url) + try: + response = await downloader.run( + extra_data={"headers": V2_ACCEPT_HEADERS, "http_method": "head"} + ) + except ClientResponseError: + raise PathNotResolved(path) + + digest = response.headers.get("docker-content-digest") + if tag.tagged_manifest.digest != digest: + pull_downloader = await PullThroughDownloader.create( + distribution, repository_version, path, tag_name + ) + pull_downloader.downloader = downloader + raw_manifest, digest, media_type = await pull_downloader.download_manifest( + run_pipeline=True + ) + headers = { + "Content-Type": media_type, + "Docker-Content-Digest": digest, + "Docker-Distribution-API-Version": "registry/2.0", + } + return web.Response(text=raw_manifest, headers=headers) + + accepted_media_types = get_accepted_media_types(request.headers) # we do not convert OCI to docker oci_mediatypes = [MEDIA_TYPE.MANIFEST_OCI, MEDIA_TYPE.INDEX_OCI] @@ -228,7 +214,8 @@ async def get_tag(self, request): async def dispatch_tag(self, request, tag, response_headers): """ - Finds an artifact associated with a Tag and sends it to the client. + Finds an artifact associated with a Tag and sends it to the client, otherwise tries + to stream it. Args: request(:class:`~aiohttp.web.Request`): The request to prepare a response for. @@ -241,8 +228,13 @@ async def dispatch_tag(self, request, tag, response_headers): streamed back to the client. """ - artifact = await sync_to_async(tag.tagged_manifest._artifacts.get)() - return await Registry._dispatch(artifact, response_headers) + try: + artifact = await tag.tagged_manifest._artifacts.aget() + except ObjectDoesNotExist: + ca = await sync_to_async(lambda x: x[0])(tag.tagged_manifest.contentartifact_set.all()) + return await self._stream_content_artifact(request, web.StreamResponse(), ca) + else: + return await Registry._dispatch(artifact, response_headers) @staticmethod async def dispatch_converted_schema(tag, accepted_media_types, path): @@ -295,16 +287,14 @@ async def get_by_digest(self, request): raise PathNotResolved(path) if digest == EMPTY_BLOB: return await Registry._empty_blob() - try: - content = await sync_to_async(repository_version.get_content)() - repository = await sync_to_async(repository_version.repository.cast)() - if repository.PUSH_ENABLED: - pending_blobs = repository.pending_blobs.values_list("pk") - pending_manifests = repository.pending_manifests.values_list("pk") - pending_content = pending_blobs.union(pending_manifests) - content |= Content.objects.filter(pk__in=pending_content) + repository = await repository_version.repository.acast() + pending_blobs = repository.pending_blobs.values_list("pk") + pending_manifests = repository.pending_manifests.values_list("pk") + pending_content = pending_blobs.union(pending_manifests) + content = repository_version.content | Content.objects.filter(pk__in=pending_content) + try: ca = await ContentArtifact.objects.select_related("artifact", "content").aget( content__in=content, relative_path=digest ) @@ -318,7 +308,29 @@ async def get_by_digest(self, request): "Docker-Content-Digest": ca_content.digest, } except ObjectDoesNotExist: - raise PathNotResolved(path) + distribution = await distribution.acast() + if distribution.remote_id and distribution.pull_through_distribution_id: + pull_downloader = await PullThroughDownloader.create( + distribution, repository_version, path, digest + ) + content_type = request.match_info["content"] + if content_type == "manifests": + raw_manifest, digest, media_type = await pull_downloader.download_manifest() + headers = { + "Content-Type": media_type, + "Docker-Content-Digest": digest, + "Docker-Distribution-API-Version": "registry/2.0", + } + return web.Response(text=raw_manifest, headers=headers) + else: + # there might be a case where the client has all the manifest data in place + # and tries to download only missing blobs; because of that, only the reference + # to a remote blob is returned + blob = await pull_downloader.init_remote_blob() + ca = await blob.contentartifact_set.afirst() + return await self._stream_content_artifact(request, web.StreamResponse(), ca) + else: + raise PathNotResolved(path) else: artifact = ca.artifact if artifact: @@ -341,3 +353,179 @@ async def _empty_blob(): "Docker-Distribution-API-Version": "registry/2.0", } return web.Response(body=body, headers=response_headers) + + +class PullThroughDownloader: + def __init__(self, distribution, remote, repository, repository_version, path, identifier): + self.distribution = distribution + self.remote = remote + self.repository = repository + self.repository_version = repository_version + self.path = path + self.identifier = identifier + self.downloader = None + + @classmethod + async def create(cls, distribution, repository_version, path, identifier): + remote = await distribution.remote.acast() + repository = await repository_version.repository.acast() + return cls(distribution, remote, repository, repository_version, path, identifier) + + async def init_remote_blob(self): + return await self.save_blob(self.identifier, None) + + async def download_manifest(self, run_pipeline=False): + response = await self.run_manifest_downloader() + + with open(response.path) as f: + raw_data = f.read() + + response.artifact_attributes["file"] = response.path + saved_artifact = await save_artifact(response.artifact_attributes) + + if run_pipeline: + await self.run_pipeline(saved_artifact) + + try: + manifest_data = json.loads(raw_data) + except json.decoder.JSONDecodeError: + raise PathNotResolved(self.identifier) + media_type = determine_media_type(manifest_data, response) + if media_type in (MEDIA_TYPE.MANIFEST_V1_SIGNED, MEDIA_TYPE.MANIFEST_V1): + digest = calculate_digest(raw_data) + else: + digest = f"sha256:{response.artifact_attributes['sha256']}" + + if media_type not in (MEDIA_TYPE.MANIFEST_LIST, MEDIA_TYPE.INDEX_OCI): + # add the manifest and blobs to the repository to be able to stream it + # in the next round when a client approaches the registry + await self.init_pending_content(digest, manifest_data, media_type, saved_artifact) + + return raw_data, digest, media_type + + async def run_manifest_downloader(self): + if self.downloader is None: + relative_url = "/v2/{name}/manifests/{identifier}".format( + name=self.remote.namespaced_upstream_name, identifier=self.identifier + ) + url = urljoin(self.remote.url, relative_url) + self.downloader = self.remote.get_downloader(url=url) + + try: + response = await self.downloader.run(extra_data={"headers": V2_ACCEPT_HEADERS}) + except ClientResponseError as response_error: + if response_error.status == 429: + # the client could request the manifest outside the docker hub pull limit; + # it is necessary to pass this information back to the client + raise HTTPTooManyRequests() + else: + raise PathNotResolved(self.path) + + return response + + async def run_pipeline(self, saved_artifact): + set_guid(generate_guid()) + await sync_to_async(dispatch)( + download_image_data, + exclusive_resources=[self.repository_version.repository], + kwargs={ + "repository_pk": self.repository_version.repository.pk, + "remote_pk": self.remote.pk, + "manifest_artifact_pk": saved_artifact.pk, + "tag_name": self.identifier, + }, + ) + + async def init_pending_content(self, digest, manifest_data, media_type, artifact): + if config := manifest_data.get("config", None): + config_digest = config["digest"] + config_blob = await self.save_config_blob(config_digest) + await sync_to_async(self.repository.pending_blobs.add)(config_blob) + else: + config_blob = None + + manifest = Manifest( + digest=digest, + schema_version=2, + media_type=media_type, + config_blob=config_blob, + ) + try: + await manifest.asave() + except IntegrityError: + manifest = await Manifest.objects.aget(digest=manifest.digest) + await sync_to_async(manifest.touch)() + await sync_to_async(self.repository.pending_manifests.add)(manifest) + + for layer in manifest_data["layers"]: + blob = await self.save_blob(layer["digest"], manifest) + await sync_to_async(self.repository.pending_blobs.add)(blob) + + content_artifact = ContentArtifact( + artifact=artifact, content=manifest, relative_path=manifest.digest + ) + with suppress(IntegrityError): + await content_artifact.asave() + + async def save_blob(self, digest, manifest): + blob = Blob(digest=digest) + try: + await blob.asave() + except IntegrityError: + blob = await Blob.objects.aget(digest=digest) + await sync_to_async(blob.touch)() + + bm_rel = BlobManifest(manifest=manifest, manifest_blob=blob) + with suppress(IntegrityError): + await bm_rel.asave() + + ca = ContentArtifact( + content=blob, + artifact=None, + relative_path=digest, + ) + with suppress(IntegrityError): + await ca.asave() + + relative_url = "/v2/{name}/blobs/{digest}".format( + name=self.remote.namespaced_upstream_name, digest=digest + ) + blob_url = urljoin(self.remote.url, relative_url) + ra = RemoteArtifact( + url=blob_url, + sha256=digest[len("sha256:") :], + content_artifact=ca, + remote=self.remote, + ) + with suppress(IntegrityError): + await ra.asave() + + return blob + + async def save_config_blob(self, config_digest): + blob_relative_url = "/v2/{name}/blobs/{digest}".format( + name=self.remote.namespaced_upstream_name, digest=config_digest + ) + blob_url = urljoin(self.remote.url, blob_relative_url) + downloader = self.remote.get_downloader(url=blob_url) + response = await downloader.run() + + response.artifact_attributes["file"] = response.path + saved_artifact = await save_artifact(response.artifact_attributes) + + config_blob = Blob(digest=config_digest) + try: + await config_blob.asave() + except IntegrityError: + config_blob = await Blob.objects.aget(digest=config_digest) + await sync_to_async(config_blob.touch)() + + content_artifact = ContentArtifact( + content=config_blob, + artifact=saved_artifact, + relative_path=config_digest, + ) + with suppress(IntegrityError): + await content_artifact.asave() + + return config_blob diff --git a/pulp_container/app/registry_api.py b/pulp_container/app/registry_api.py index 7af3b5c92..1ce861424 100644 --- a/pulp_container/app/registry_api.py +++ b/pulp_container/app/registry_api.py @@ -91,6 +91,16 @@ log = logging.getLogger(__name__) +IGNORED_PULL_THROUGH_REMOTE_ATTRIBUTES = [ + "remote_ptr_id", + "pulp_type", + "pulp_last_updated", + "pulp_created", + "pulp_id", + "url", + "name", +] + class ContentRenderer(BaseRenderer): """ @@ -300,11 +310,13 @@ def get_pull_through_drv(self, path): name=path, retain_repo_versions=1 ) + remote_data = _get_pull_through_remote_data(root_cache_distribution) upstream_name = path.split(root_cache_distribution.base_path, maxsplit=1)[1] cache_remote, _ = models.ContainerRemote.objects.get_or_create( name=path, upstream_name=upstream_name.strip("/"), url=root_cache_distribution.remote.url, + **remote_data, ) cache_distribution, _ = models.ContainerDistribution.objects.get_or_create( @@ -370,6 +382,15 @@ def create_dr(self, path, request): return distribution, repository +def _get_pull_through_remote_data(root_cache_distribution): + remote_data = models.ContainerPullThroughRemote.objects.filter( + pk=root_cache_distribution.remote_id + ).values()[0] + for attr in IGNORED_PULL_THROUGH_REMOTE_ATTRIBUTES: + remote_data.pop(attr, None) + return remote_data + + class BearerTokenView(APIView): """ Hand out anonymous or authenticated bearer tokens. @@ -948,13 +969,10 @@ def handle_safe_method(self, request, path, pk): if pk == EMPTY_BLOB: return redirects.redirect_to_content_app("blobs", pk) repository = repository.cast() - if repository.PUSH_ENABLED: - try: - blob = repository.pending_blobs.get(digest=pk) - blob.touch() - except models.Blob.DoesNotExist: - raise BlobNotFound(digest=pk) - else: + try: + blob = repository.pending_blobs.get(digest=pk) + blob.touch() + except models.Blob.DoesNotExist: raise BlobNotFound(digest=pk) return redirects.issue_blob_redirect(blob) @@ -993,9 +1011,13 @@ def handle_safe_method(self, request, path, pk): try: tag = models.Tag.objects.get(name=pk, pk__in=repository_version.content) except models.Tag.DoesNotExist: - if distribution.remote: + distribution = distribution.cast() + if distribution.remote and distribution.pull_through_distribution_id: remote = distribution.remote.cast() repository = distribution.repository.cast() + # issue a head request first to ensure that the content exists on the remote + # source; we want to prevent immediate "not found" error responses from + # content-app: 302 (api-app) -> 404 (content-app) manifest = self.fetch_manifest(remote, repository_version, repository, pk) if manifest is None: return redirects.redirect_to_content_app("manifests", pk) @@ -1017,23 +1039,21 @@ def handle_safe_method(self, request, path, pk): manifest = models.Manifest.objects.get(digest=pk, pk__in=repository_version.content) except models.Manifest.DoesNotExist: repository = repository.cast() - if repository.PUSH_ENABLED: - # the manifest might be a part of listed manifests currently being uploaded - try: - manifest = repository.pending_manifests.get(digest=pk) - manifest.touch() - except models.Manifest.DoesNotExist: - raise ManifestNotFound(reference=pk) - else: - if distribution.remote: - remote = distribution.remote.cast() - manifest = self.fetch_manifest(remote, repository_version, repository, pk) - if manifest is None: - return redirects.redirect_to_content_app("manifests", pk) + # the manifest might be a part of listed manifests currently being uploaded + # or saved during the pull-through caching + try: + manifest = repository.pending_manifests.get(digest=pk) + manifest.touch() + except models.Manifest.DoesNotExist: + pass - raise ManifestNotFound(reference=pk) - else: - raise ManifestNotFound(reference=pk) + distribution = distribution.cast() + if distribution.remote and distribution.pull_through_distribution_id: + remote = distribution.remote.cast() + self.fetch_manifest(remote, repository_version, repository, pk) + return redirects.redirect_to_content_app("manifests", pk) + else: + raise ManifestNotFound(reference=pk) return redirects.issue_manifest_redirect(manifest) @@ -1042,7 +1062,7 @@ def fetch_manifest(self, remote, repository, repository_version, pk): name=remote.namespaced_upstream_name, pk=pk ) tag_url = urljoin(remote.url, relative_url) - downloader = remote.get_in_memory_downloader(url=tag_url) + downloader = remote.get_downloader(url=tag_url) try: response = downloader.fetch( extra_data={"headers": V2_ACCEPT_HEADERS, "http_method": "head"} @@ -1315,7 +1335,14 @@ def get(self, request, path, pk): manifest = repository.pending_manifests.get(digest=pk) manifest.touch() except models.Manifest.DoesNotExist: - raise ManifestNotFound(reference=pk) + # try for the last time since the pending data could be + # removed if a pull through caching task finishes + try: + manifest = models.Manifest.objects.get( + digest=pk, pk__in=repository_version.content + ) + except models.Manifest.DoesNotExist: + raise ManifestNotFound(reference=pk) signatures = models.ManifestSignature.objects.filter( signed_manifest=manifest, pk__in=repository_version.content diff --git a/pulp_container/app/serializers.py b/pulp_container/app/serializers.py index 19c6f2642..0407bc3e7 100644 --- a/pulp_container/app/serializers.py +++ b/pulp_container/app/serializers.py @@ -282,11 +282,7 @@ class ContainerPullThroughRemoteSerializer(RemoteSerializer): A serializer for a remote used in the pull-through distribution. """ - policy = serializers.ChoiceField( - help_text="The policy always mimics the on_demand behaviour when performing pull-through.", - choices=((models.Remote.ON_DEMAND, "When syncing, download just the metadata.")), - default=models.Remote.ON_DEMAND, - ) + policy = serializers.ChoiceField(choices=[Remote.ON_DEMAND], default=Remote.ON_DEMAND) class Meta: fields = RemoteSerializer.Meta.fields @@ -329,7 +325,7 @@ class ContainerDistributionSerializer(DistributionSerializer, GetOrCreateSeriali required=False, help_text=_("Remote that can be used to fetch content when using pull-through caching."), view_name_pattern=r"remotes(-.*/.*)?-detail", - queryset=models.ContainerRemote.objects.all(), + read_only=True, ) def validate(self, data): @@ -399,6 +395,19 @@ class ContainerPullThroughDistributionSerializer(DistributionSerializer): view_name_pattern=r"remotes(-.*/.*)-detail", queryset=models.ContainerPullThroughRemote.objects.all(), ) + namespace = RelatedField( + required=False, + read_only=True, + view_name="pulp_container/namespaces-detail", + help_text=_("Namespace this distribution belongs to."), + ) + content_guard = DetailRelatedField( + required=False, + help_text=_("An optional content-guard. If none is specified, a default one will be used."), + view_name=r"contentguards-container/content-redirect-detail", + queryset=ContentRedirectContentGuard.objects.all(), + allow_null=False, + ) distributions = DetailRelatedField( many=True, help_text="Distributions created after pulling content through cache", @@ -406,10 +415,36 @@ class ContainerPullThroughDistributionSerializer(DistributionSerializer): queryset=models.ContainerDistribution.objects.all(), required=False, ) + description = serializers.CharField( + help_text=_("An optional description."), required=False, allow_null=True + ) + + def validate(self, data): + validated_data = super().validate(data) + + if "content_guard" not in validated_data: + validated_data["content_guard"] = ContentRedirectContentGuardSerializer.get_or_create( + {"name": "content redirect"} + ) + + base_path = validated_data.get("base_path") + if base_path: + namespace_name = base_path.split("/")[0] + validated_data["namespace"] = ContainerNamespaceSerializer.get_or_create( + {"name": namespace_name} + ) + + return validated_data class Meta: model = models.ContainerPullThroughDistribution - fields = DistributionSerializer.Meta.fields + ("remote", "distributions") + fields = tuple(set(DistributionSerializer.Meta.fields) - {"base_url"}) + ( + "remote", + "distributions", + "namespace", + "private", + "description", + ) class TagOperationSerializer(ValidateFieldsMixin, serializers.Serializer): @@ -758,6 +793,12 @@ class ContainerRepositorySyncURLSerializer(RepositorySyncURLSerializer): Serializer for Container Sync. """ + remote = DetailRelatedField( + required=False, + view_name_pattern=r"remotes(-.*/.*)-detail", + queryset=models.ContainerRemote.objects.all(), + help_text=_("A remote to sync from. This will override a remote set on repository."), + ) signed_only = serializers.BooleanField( required=False, default=False, diff --git a/pulp_container/app/tasks/download_image_data.py b/pulp_container/app/tasks/download_image_data.py index f4076a391..6c62c5501 100644 --- a/pulp_container/app/tasks/download_image_data.py +++ b/pulp_container/app/tasks/download_image_data.py @@ -1,81 +1,58 @@ -import asyncio import json import logging -from tempfile import NamedTemporaryFile - -from asgiref.sync import sync_to_async - -from django.db import IntegrityError - from pulpcore.plugin.models import Artifact -from pulpcore.plugin.stages import ( - ArtifactDownloader, - ArtifactSaver, - DeclarativeContent, - DeclarativeVersion, - RemoteArtifactSaver, - ResolveContentFutures, - QueryExistingArtifacts, - QueryExistingContents, -) +from pulpcore.plugin.stages import DeclarativeContent from pulp_container.app.models import ContainerRemote, ContainerRepository, Tag from pulp_container.app.utils import determine_media_type_from_json from pulp_container.constants import MEDIA_TYPE -from .sync_stages import ContainerContentSaver, ContainerFirstStage +from .synchronize import ContainerDeclarativeVersion +from .sync_stages import ContainerFirstStage log = logging.getLogger(__name__) -def download_image_data(repository_pk, remote_pk, tag_name, response_data): +def download_image_data(repository_pk, remote_pk, manifest_artifact_pk, tag_name): repository = ContainerRepository.objects.get(pk=repository_pk) - remote = ContainerRemote.objects.get(pk=remote_pk).cast() + remote = ContainerRemote.objects.get(pk=remote_pk) + manifest_artifact = Artifact.objects.get(pk=manifest_artifact_pk) log.info("Pulling cache: repository={r} remote={p}".format(r=repository.name, p=remote.name)) - first_stage = ContainerPullThroughFirstStage(remote, tag_name, response_data) - dv = ContainerPullThroughCacheDeclarativeVersion(first_stage, repository, mirror=False) + first_stage = ContainerPullThroughFirstStage(remote, manifest_artifact, tag_name) + dv = ContainerDeclarativeVersion(first_stage, repository) return dv.create() class ContainerPullThroughFirstStage(ContainerFirstStage): - """The stage that prepares the pipeline for downloading a specific tag and its data.""" + """The stage that prepares the pipeline for downloading a single tag and its related data.""" - def __init__(self, remote, tag_name, response_data): - """Initialize the stage.""" + def __init__(self, remote, manifest_artifact, tag_name): + """Initialize the stage with the artifact defined in content-app.""" super().__init__(remote, signed_only=False) self.tag_name = tag_name - self.response_data = response_data + self.manifest_artifact = manifest_artifact async def run(self): - """Run the stage and set declarative content for one tag, its manifest, and blobs.""" - tag_dc = DeclarativeContent(Tag(name=self.tag_name)) + """Run the stage and create declarative content for one tag, its manifest, and blobs. - content_data = json.loads(self.response_data) - with NamedTemporaryFile("w") as temp_file: - temp_file.write(self.response_data) - temp_file.flush() + This method is a tinified method based on ``ContainerFirstStage.run`` with syncing just + a single tag. + """ + tag_dc = DeclarativeContent(Tag(name=self.tag_name)) + self.tag_dcs.append(tag_dc) - artifact = Artifact.init_and_validate(temp_file.name) - try: - await artifact.asave() - except IntegrityError: - artifact = await Artifact.objects.aget(sha256=artifact.sha256) - await sync_to_async(artifact.touch)() + raw_data = self.manifest_artifact.file.read() + content_data = json.loads(raw_data) + self.manifest_artifact.file.close() media_type = determine_media_type_from_json(content_data) if media_type in (MEDIA_TYPE.MANIFEST_LIST, MEDIA_TYPE.INDEX_OCI): list_dc = self.create_tagged_manifest_list( - self.tag_name, artifact, content_data, media_type + self.tag_name, self.manifest_artifact, content_data, media_type ) - for listed_manifest_task in asyncio.as_completed( - [ - self.create_listed_manifest(manifest_data) - for manifest_data in content_data.get("manifests") - ] - ): - listed_manifest = await listed_manifest_task - man_dc = listed_manifest["manifest_dc"] + for manifest_data in content_data.get("manifests"): + listed_manifest = await self.create_listed_manifest(manifest_data) list_dc.extra_data["listed_manifests"].append(listed_manifest) else: tag_dc.extra_data["tagged_manifest_dc"] = list_dc @@ -88,60 +65,10 @@ async def run(self): else: # Simple tagged manifest man_dc = self.create_tagged_manifest( - self.tag_name, artifact, content_data, self.response_data, media_type + self.tag_name, self.manifest_artifact, content_data, raw_data, media_type ) tag_dc.extra_data["tagged_manifest_dc"] = man_dc await self.handle_blobs(man_dc, content_data) self.manifest_dcs.append(man_dc) - for manifest_dc in self.manifest_dcs: - config_blob_dc = manifest_dc.extra_data.get("config_blob_dc") - if config_blob_dc: - manifest_dc.content.config_blob = await config_blob_dc.resolution() - for blob_dc in manifest_dc.extra_data["blob_dcs"]: - # Just await here. They will be associated in the post_save hook. - await blob_dc.resolution() - await self.put(manifest_dc) - self.manifest_dcs.clear() - - for manifest_list_dc in self.manifest_list_dcs: - for listed_manifest in manifest_list_dc.extra_data["listed_manifests"]: - # Just await here. They will be associated in the post_save hook. - await listed_manifest["manifest_dc"].resolution() - await self.put(manifest_list_dc) - self.manifest_list_dcs.clear() - - tagged_manifest_dc = tag_dc.extra_data["tagged_manifest_dc"] - tag_dc.content.tagged_manifest = await tagged_manifest_dc.resolution() - await self.put(tag_dc) - - -class ContainerPullThroughCacheDeclarativeVersion(DeclarativeVersion): - """ - Subclassed Declarative version that creates a pipeline for caching remote content. - """ - - def pipeline_stages(self, new_version): - """ - Define the "architecture" of caching remote content. - - Args: - new_version (:class:`~pulpcore.plugin.models.RepositoryVersion`): The - new repository version that is going to be built. - - Returns: - list: List of :class:`~pulpcore.plugin.stages.Stage` instances - - """ - pipeline = [ - self.first_stage, - QueryExistingArtifacts(), - ArtifactDownloader(), - ArtifactSaver(), - QueryExistingContents(), - ContainerContentSaver(), - RemoteArtifactSaver(), - ResolveContentFutures(), - ] - - return pipeline + await self.resolve_flush() diff --git a/pulp_container/app/tasks/sync_stages.py b/pulp_container/app/tasks/sync_stages.py index 8782b8073..100ab961f 100644 --- a/pulp_container/app/tasks/sync_stages.py +++ b/pulp_container/app/tasks/sync_stages.py @@ -9,7 +9,6 @@ from urllib.parse import urljoin, urlparse, urlunparse from asgiref.sync import sync_to_async -from django.db import IntegrityError from pulpcore.plugin.models import Artifact, ProgressReport, Remote from pulpcore.plugin.stages import DeclarativeArtifact, DeclarativeContent, Stage, ContentSaver @@ -30,6 +29,7 @@ Tag, ) from pulp_container.app.utils import ( + save_artifact, extract_data_from_signature, urlpath_sanitize, determine_media_type, @@ -40,17 +40,6 @@ log = logging.getLogger(__name__) -async def _save_artifact(artifact_attributes): - saved_artifact = Artifact(**artifact_attributes) - try: - await saved_artifact.asave() - except IntegrityError: - del artifact_attributes["file"] - saved_artifact = await Artifact.objects.aget(**artifact_attributes) - await sync_to_async(saved_artifact.touch)() - return saved_artifact - - class ContainerFirstStage(Stage): """ The first stage of a pulp_container sync pipeline. @@ -84,7 +73,7 @@ async def _download_and_save_artifact_data(self, manifest_url): raw_data = content_file.read() response.artifact_attributes["file"] = response.path - saved_artifact = await _save_artifact(response.artifact_attributes) + saved_artifact = await save_artifact(response.artifact_attributes) content_data = json.loads(raw_data) return saved_artifact, content_data, raw_data, response @@ -157,7 +146,7 @@ async def run(self): for artifact in asyncio.as_completed(to_download_artifact): saved_artifact, content_data, raw_data, response = await artifact - digest = response.artifact_attributes["sha256"] + digest = saved_artifact.sha256 # Look for cosign signatures # cosign signature has a tag convention 'sha256-1234.sig' diff --git a/pulp_container/app/utils.py b/pulp_container/app/utils.py index aa1711331..3e7f64206 100644 --- a/pulp_container/app/utils.py +++ b/pulp_container/app/utils.py @@ -7,10 +7,12 @@ import logging import time +from asgiref.sync import sync_to_async from jsonschema import Draft7Validator, validate, ValidationError +from django.db import IntegrityError from rest_framework.exceptions import Throttled -from pulpcore.plugin.models import Task +from pulpcore.plugin.models import Artifact, Task from pulp_container.constants import ALLOWED_ARTIFACT_TYPES, MANIFEST_MEDIA_TYPES, MEDIA_TYPE from pulp_container.app.exceptions import ManifestInvalid @@ -288,3 +290,14 @@ def pad_unpadded_b64(unpadded_b64): # Add back the missing padding characters, based on the length of the encoded string paddings = {0: "", 2: "==", 3: "="} return unpadded_b64 + paddings[len(unpadded_b64) % 4] + + +async def save_artifact(artifact_attributes): + saved_artifact = Artifact(**artifact_attributes) + try: + await saved_artifact.asave() + except IntegrityError: + del artifact_attributes["file"] + saved_artifact = await Artifact.objects.aget(**artifact_attributes) + await sync_to_async(saved_artifact.touch)() + return saved_artifact diff --git a/pulp_container/tests/functional/api/test_pull_through_cache.py b/pulp_container/tests/functional/api/test_pull_through_cache.py index bc0bb7244..6a623d72d 100644 --- a/pulp_container/tests/functional/api/test_pull_through_cache.py +++ b/pulp_container/tests/functional/api/test_pull_through_cache.py @@ -1,3 +1,4 @@ +import time import subprocess import pytest @@ -7,6 +8,7 @@ REGISTRY_V2, REGISTRY_V2_FEED_URL, PULP_HELLO_WORLD_REPO, + PULP_FIXTURE_1, ) @@ -27,7 +29,8 @@ def pull_through_distribution( return distribution -def test_image_pull( +@pytest.fixture +def pull_and_verify( add_to_cleanup, container_pull_through_distribution_api, container_distribution_api, @@ -36,51 +39,67 @@ def test_image_pull( container_tag_api, registry_client, local_registry, - pull_through_distribution, ): - remote_image_path = f"{REGISTRY_V2}/{PULP_HELLO_WORLD_REPO}" - registry_client.pull(f"{remote_image_path}:latest") - remote_image = registry_client.inspect(remote_image_path) + def _pull_and_verify(images, pull_through_distribution): + tags_to_verify = [] + for version, image_path in enumerate(images, start=1): + remote_image_path = f"{REGISTRY_V2}/{image_path}" + local_image_path = f"{pull_through_distribution.base_path}/{image_path}" + + # 1. pull remote content through the pull-through distribution + local_registry.pull(local_image_path) + local_image = local_registry.inspect(local_image_path) + + # when the client pulls the image, a repository, distribution, and remote is created in + # the background; therefore, scheduling the cleanup for these entities is necessary + path, tag = local_image_path.split(":") + tags_to_verify.append(tag) + repository = container_repository_api.list(name=path).results[0] + add_to_cleanup(container_repository_api, repository.pulp_href) + remote = container_remote_api.list(name=path).results[0] + add_to_cleanup(container_remote_api, remote.pulp_href) + distribution = container_distribution_api.list(name=path).results[0] + add_to_cleanup(container_distribution_api, distribution.pulp_href) - local_image_path = f"{pull_through_distribution.base_path}/{PULP_HELLO_WORLD_REPO}" - local_registry.pull(f"{local_image_path}:latest") - local_image = local_registry.inspect(local_image_path) + pull_through_distribution = container_pull_through_distribution_api.list( + name=pull_through_distribution.name + ).results[0] + assert [distribution.pulp_href] == pull_through_distribution.distributions - # when the client pulls the image, a repository, distribution, and remote is created in - # the background; therefore, scheduling the cleanup for these entities is necessary - repository = container_repository_api.list(name=local_image_path).results[0] - add_to_cleanup(container_repository_api, repository.pulp_href) - remote = container_remote_api.list(name=local_image_path).results[0] - add_to_cleanup(container_remote_api, remote.pulp_href) - distribution = container_distribution_api.list(name=local_image_path).results[0] - add_to_cleanup(container_distribution_api, distribution.pulp_href) + # 2. verify if the pulled content is the same as on the remote + registry_client.pull(remote_image_path) + remote_image = registry_client.inspect(remote_image_path) + assert local_image[0]["Id"] == remote_image[0]["Id"] - assert local_image[0]["Id"] == remote_image[0]["Id"] + # 3. check if the repository version has changed + for _ in range(5): + repository = container_repository_api.list(name=path).results[0] + if f"{repository.pulp_href}versions/{version}/" == repository.latest_version_href: + break - tags = container_tag_api.list(repository_version=repository.latest_version_href).results - assert ["latest"] == [tag.name for tag in tags] + # there might be still the saving process running in the background + time.sleep(1) + else: + assert False, "The repository was not updated with the cached content." - pull_through_distribution = container_pull_through_distribution_api.list( - name=pull_through_distribution.name - ).results[0] - assert [distribution.pulp_href] == pull_through_distribution.distributions + # 4. test if pulling the same content twice does not raise any error + local_registry.pull(local_image_path) - assert f"{repository.pulp_href}versions/1/" == repository.latest_version_href + # 5. assert the cached tags + tags = container_tag_api.list(repository_version=repository.latest_version_href).results + assert sorted(tags_to_verify) == sorted([tag.name for tag in tags]) - # 1. test if pulling the same content twice works - local_registry.pull(f"{local_image_path}:latest") + return _pull_and_verify - repository = container_repository_api.list(name=local_image_path).results[0] - assert f"{repository.pulp_href}versions/1/" == repository.latest_version_href - # 2. test if pulling new content results into a new version, preserving the old content - local_registry.pull(f"{local_image_path}:linux") +def test_manifest_list_pull(delete_orphans_pre, pull_through_distribution, pull_and_verify): + images = [f"{PULP_HELLO_WORLD_REPO}:latest", f"{PULP_HELLO_WORLD_REPO}:linux"] + pull_and_verify(images, pull_through_distribution) - repository = container_repository_api.list(name=local_image_path).results[0] - assert f"{repository.pulp_href}versions/2/" == repository.latest_version_href - tags = container_tag_api.list(repository_version=repository.latest_version_href).results - assert ["latest", "linux"] == sorted([tag.name for tag in tags]) +def test_manifest_pull(delete_orphans_pre, pull_through_distribution, pull_and_verify): + images = [f"{PULP_FIXTURE_1}:manifest_a", f"{PULP_FIXTURE_1}:manifest_b"] + pull_and_verify(images, pull_through_distribution) def test_conflicting_names_and_paths( @@ -97,6 +116,7 @@ def test_conflicting_names_and_paths( local_image_path = f"{pull_through_distribution.base_path}/{str(uuid4())}" remote = container_remote_factory(name=local_image_path) + # a remote with the same name but a different URL already exists with pytest.raises(subprocess.CalledProcessError): local_registry.pull(local_image_path) monitor_task(container_remote_api.delete(remote.pulp_href).task) @@ -105,6 +125,7 @@ def test_conflicting_names_and_paths( assert 0 == len(container_distribution_api.list(name=local_image_path).results) repository = container_repository_factory(name=local_image_path) + # a repository with the same name but a different retain configuration already exists with pytest.raises(subprocess.CalledProcessError): local_registry.pull(local_image_path) monitor_task(container_repository_api.delete(repository.pulp_href).task) @@ -114,6 +135,7 @@ def test_conflicting_names_and_paths( data = {"name": local_image_path, "base_path": local_image_path} distribution = gen_object_with_cleanup(container_distribution_api, data) + # a distribution with the same name but different foreign keys already exists with pytest.raises(subprocess.CalledProcessError): local_registry.pull(local_image_path) monitor_task(container_distribution_api.delete(distribution.pulp_href).task) diff --git a/requirements.txt b/requirements.txt index 4e8130902..e78158fea 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ ecdsa>=0.14,<=0.18.0 jsonschema>=4.4,<4.21 -pulpcore>=3.40.3,<3.55 +pulpcore>=3.43.0,<3.55 pyjwkest>=1.4,<=1.4.2 pyjwt[crypto]>=2.4,<2.9