diff --git a/CHANGES/469.feature b/CHANGES/469.feature new file mode 100644 index 000000000..17be7b683 --- /dev/null +++ b/CHANGES/469.feature @@ -0,0 +1 @@ +Added support for pushing manifest lists via the Registry API. diff --git a/docs/workflows/push.rst b/docs/workflows/push.rst index b649ee8b9..270d5c811 100644 --- a/docs/workflows/push.rst +++ b/docs/workflows/push.rst @@ -3,10 +3,10 @@ Push content to a Repository ============================= -Users can push images to the repositories hosted by the Container Registry. It is possible to push -images that container foreign ( non-distributable) layers. Only the users who are logged in to the -registry are allowed to perform push operation. Find below a complete example of pushing a tagged -image. +Users can push images (manifests and manifest lists) to repositories hosted by the Container +Registry. It is possible to push images that container foreign (non-distributable) layers. Only the +users who are logged in to the registry are allowed to perform push operation. Find below a complete +example of pushing a tagged image. .. note:: Having disabled the token authentication, only users with staff privileges (i.e., diff --git a/pulp_container/app/registry_api.py b/pulp_container/app/registry_api.py index e459d1e35..83a911571 100644 --- a/pulp_container/app/registry_api.py +++ b/pulp_container/app/registry_api.py @@ -11,7 +11,6 @@ import hashlib import re from collections import namedtuple -import time from urllib.parse import urlparse, urlunparse, parse_qs, urlencode from tempfile import NamedTemporaryFile @@ -69,7 +68,7 @@ RegistryPermission, TokenPermission, ) -from pulp_container.app.utils import extract_data_from_signature +from pulp_container.app.utils import extract_data_from_signature, has_task_completed from pulp_container.constants import ( EMPTY_BLOB, SIGNATURE_API_EXTENSION_VERSION, @@ -673,20 +672,8 @@ def put(self, request, path, pk=None): }, ) - # Wait a small amount of time - for dummy in range(3): - time.sleep(1) - task = Task.objects.get(pk=dispatched_task.pk) - if task.state == "completed": - task.delete() - return BlobResponse(blob, path, 201, request) - elif task.state in ["waiting", "running"]: - continue - else: - error = task.error - task.delete() - raise Exception(str(error)) - raise Throttled() + if has_task_completed(dispatched_task): + return BlobResponse(blob, path, 201, request) else: raise Exception("The digest did not match") @@ -790,83 +777,126 @@ def put(self, request, path, pk=None): """ Responds with the actual manifest """ - _, repository = self.get_dr_push(request, path) + # when a user uploads a manifest list with zero listed manifests (no blobs were uploaded) + # and the specified repository has not been created yet, create the repository without + # raising an error + create_new_repo = request.content_type in ( + models.MEDIA_TYPE.MANIFEST_LIST, models.MEDIA_TYPE.INDEX_OCI + ) + _, repository = self.get_dr_push(request, path, create=create_new_repo) # iterate over all the layers and create chunk = request.META["wsgi.input"] artifact = self.receive_artifact(chunk) manifest_digest = "sha256:{id}".format(id=artifact.sha256) - with storage.open(artifact.file.name) as artifact_file: - raw_data = artifact_file.read() - content_data = json.loads(raw_data) + # oci format might not contain mediaType in the manifest.json, docker should # hence need to check request content type if request.content_type not in ( models.MEDIA_TYPE.MANIFEST_V2, models.MEDIA_TYPE.MANIFEST_OCI, + models.MEDIA_TYPE.MANIFEST_LIST, + models.MEDIA_TYPE.INDEX_OCI, ): - # we suport only v2 docker/oci schema upload raise ManifestInvalid(digest=manifest_digest) - # both docker/oci format should contain config, digest, mediaType, size - config_layer = content_data.get("config") - config_media_type = config_layer.get("mediaType") - config_digest = config_layer.get("digest") - if config_media_type not in ( - models.MEDIA_TYPE.CONFIG_BLOB, - models.MEDIA_TYPE.CONFIG_BLOB_OCI, + + with storage.open(artifact.file.name) as artifact_file: + raw_data = artifact_file.read() + + content_data = json.loads(raw_data) + + if request.content_type in ( + models.MEDIA_TYPE.MANIFEST_LIST, + models.MEDIA_TYPE.INDEX_OCI, ): - raise BlobInvalid(digest=config_digest) - try: - config_blob = models.Blob.objects.get(digest=config_digest) - except models.Blob.DoesNotExist: - raise BlobInvalid(digest=config_digest) - - # both docker/oci format should contain layers, digest, media_type, size - layers = content_data.get("layers") - blobs = set() - for layer in layers: - media_type = layer.get("mediaType") - urls = layer.get("urls") - digest = layer.get("digest") - if ( - media_type - in ( - models.MEDIA_TYPE.FOREIGN_BLOB, - models.MEDIA_TYPE.FOREIGN_BLOB_OCI, + manifest_list = self._save_manifest(artifact, manifest_digest, request) + + manifests_to_list = [] + for manifest_metadata in content_data.get("manifests"): + manifest = models.Manifest.objects.get(digest=manifest_metadata["digest"]) + + platform = manifest_metadata["platform"] + manifest_to_list = models.ManifestListManifest( + manifest_list=manifest, + image_manifest=manifest_list, + architecture=platform["architecture"], + os=platform["os"], + features=platform.get("features", ""), + variant=platform.get("variant", ""), + os_version=platform.get("os.version", ""), + os_features=platform.get("os.features", ""), ) - and not urls - ): - raise ManifestInvalid(digest=manifest_digest) - if media_type not in ( - models.MEDIA_TYPE.REGULAR_BLOB, - models.MEDIA_TYPE.REGULAR_BLOB_OCI, + manifests_to_list.append(manifest_to_list) + + models.ManifestListManifest.objects.bulk_create( + manifests_to_list, ignore_conflicts=True, batch_size=1000 + ) + manifest = manifest_list + else: + # both docker/oci format should contain config, digest, mediaType, size + config_layer = content_data.get("config") + config_media_type = config_layer.get("mediaType") + config_digest = config_layer.get("digest") + if config_media_type not in ( + models.MEDIA_TYPE.CONFIG_BLOB, + models.MEDIA_TYPE.CONFIG_BLOB_OCI, ): - raise BlobInvalid(digest=digest) - blobs.add(digest) - - manifest = models.Manifest( - digest=manifest_digest, - schema_version=2, - media_type=request.content_type, - config_blob=config_blob, - ) - try: - manifest.save() - except IntegrityError: - manifest = models.Manifest.objects.get(digest=manifest.digest) - manifest.touch() - ca = ContentArtifact(artifact=artifact, content=manifest, relative_path=manifest.digest) - try: - ca.save() - except IntegrityError: - ca = ContentArtifact.objects.get(content=manifest, relative_path=manifest.digest) + raise BlobInvalid(digest=config_digest) + try: + config_blob = models.Blob.objects.get(digest=config_digest) + except models.Blob.DoesNotExist: + raise BlobInvalid(digest=config_digest) + + # both docker/oci format should contain layers, digest, media_type, size + layers = content_data.get("layers") + blobs = set() + for layer in layers: + media_type = layer.get("mediaType") + urls = layer.get("urls") + digest = layer.get("digest") + if ( + media_type + in ( + models.MEDIA_TYPE.FOREIGN_BLOB, + models.MEDIA_TYPE.FOREIGN_BLOB_OCI, + ) + and not urls + ): + raise ManifestInvalid(digest=manifest_digest) + if media_type not in ( + models.MEDIA_TYPE.REGULAR_BLOB, + models.MEDIA_TYPE.REGULAR_BLOB_OCI, + ): + raise BlobInvalid(digest=digest) + blobs.add(digest) + + manifest = models.Manifest( + digest=manifest_digest, + schema_version=2, + media_type=request.content_type, + config_blob=config_blob, + ) + try: + manifest.save() + except IntegrityError: + manifest = models.Manifest.objects.get(digest=manifest.digest) + manifest.touch() + ca = ContentArtifact(artifact=artifact, content=manifest, relative_path=manifest.digest) + try: + ca.save() + except IntegrityError: + ca = ContentArtifact.objects.get(content=manifest, relative_path=manifest.digest) if not ca.artifact: ca.artifact = artifact ca.save(update_fields=["artifact"]) - blobs_qs = models.Blob.objects.filter(digest__in=blobs) - thru = [] - for blob in blobs_qs: - thru.append(models.BlobManifest(manifest=manifest, manifest_blob=blob)) - models.BlobManifest.objects.bulk_create(objs=thru, ignore_conflicts=True, batch_size=1000) + + blobs_qs = models.Blob.objects.filter(digest__in=blobs) + thru = [] + for blob in blobs_qs: + thru.append(models.BlobManifest(manifest=manifest, manifest_blob=blob)) + models.BlobManifest.objects.bulk_create( + objs=thru, ignore_conflicts=True, batch_size=1000 + ) + tag = models.Tag(name=pk, tagged_manifest=manifest) try: tag.save() @@ -887,20 +917,31 @@ def put(self, request, path, pk=None): }, ) - # Wait a small amount of time - for dummy in range(3): - time.sleep(1) - task = Task.objects.get(pk=dispatched_task.pk) - if task.state == "completed": - task.delete() - return ManifestResponse(manifest, path, request, status=201) - elif task.state in ["waiting", "running"]: - continue - else: - error = task.error - task.delete() - raise Exception(str(error)) - raise Throttled() + if has_task_completed(dispatched_task): + return ManifestResponse(manifest, path, request, status=201) + + def _save_manifest(self, artifact, manifest_digest, request): + manifest_list = models.Manifest( + digest=manifest_digest, schema_version=2, media_type=request.content_type + ) + try: + manifest_list.save() + except IntegrityError: + manifest_list = models.Manifest.objects.get(digest=manifest_list.digest) + manifest_list.touch() + ca = ContentArtifact( + artifact=artifact, content=manifest_list, relative_path=manifest_list.digest + ) + try: + ca.save() + except IntegrityError: + ca = ContentArtifact.objects.get( + content=manifest_list, relative_path=manifest_list.digest + ) + if not ca.artifact: + ca.artifact = artifact + ca.save(update_fields=["artifact"]) + return manifest_list def receive_artifact(self, chunk): """Handles assembling of Manifest as it's being uploaded.""" @@ -1024,18 +1065,5 @@ def put(self, request, path, pk): }, ) - # wait a small amount of time until a new repository version - # with the new signature is created - for dummy in range(3): - time.sleep(1) - task = Task.objects.get(pk=dispatched_task.pk) - if task.state == "completed": - task.delete() - return ManifestSignatureResponse(signature, path) - elif task.state in ["waiting", "running"]: - continue - else: - error = task.error - task.delete() - raise Exception(str(error)) - raise Throttled() + if has_task_completed(dispatched_task): + return ManifestSignatureResponse(signature, path) diff --git a/pulp_container/app/utils.py b/pulp_container/app/utils.py index d94a204a4..4754bb804 100644 --- a/pulp_container/app/utils.py +++ b/pulp_container/app/utils.py @@ -3,6 +3,11 @@ import gnupg import json import logging +import time + +from rest_framework.exceptions import Throttled + +from pulpcore.plugin.models import Task from pulp_container.constants import SIGNATURE_TYPE @@ -99,3 +104,30 @@ def extract_data_from_signature(signature_raw, man_digest): sig_json["signing_key_id"] = crypt_obj.key_id sig_json["signature_timestamp"] = crypt_obj.timestamp return sig_json + + +def has_task_completed(dispatched_task): + """ + Wait a couple of seconds until the task finishes its run. + + Returns: + bool: True if the task ends successfully. + + Raises: + Exception: If an error occurs during the task's runtime. + Throttled: If the task did not finish within a predefined timespan. + + """ + for dummy in range(3): + time.sleep(1) + task = Task.objects.get(pk=dispatched_task.pk) + if task.state == "completed": + task.delete() + return True + elif task.state in ["waiting", "running"]: + continue + else: + error = task.error + task.delete() + raise Exception(str(error)) + raise Throttled() diff --git a/pulp_container/tests/functional/api/test_push_content.py b/pulp_container/tests/functional/api/test_push_content.py index 9d2720683..c3fbe61a7 100644 --- a/pulp_container/tests/functional/api/test_push_content.py +++ b/pulp_container/tests/functional/api/test_push_content.py @@ -10,6 +10,9 @@ monitor_task, PulpTestCase, ) + +from pulp_container.constants import MEDIA_TYPE + from pulp_container.tests.functional.api import rbac_base from pulp_container.tests.functional.constants import REGISTRY_V2_REPO_PULP from pulp_container.tests.functional.utils import ( @@ -19,6 +22,9 @@ ) from pulpcore.client.pulp_container import ( + ContentManifestsApi, + ContentTagsApi, + DistributionsContainerApi, PulpContainerNamespacesApi, RepositoriesContainerPushApi, ) @@ -317,3 +323,195 @@ def test_matching_username(self): # cleanup, namespace removal also removes related distributions namespace = self.namespace_api.list(name=namespace_name).results[0] self.addCleanup(self.namespace_api.delete, namespace.pulp_href) + + +class PushManifestListTestCase(PulpTestCase, rbac_base.BaseRegistryTest): + """A test case that verifies if a container client can push manifest lists to the registry.""" + + @classmethod + def setUpClass(cls): + """Initialize a new manifest list that will be pushed to the registry.""" + cfg = config.get_config() + cls.registry = cli.RegistryClient(cfg) + cls.registry.raise_if_unsupported(unittest.SkipTest, "Tests require podman/docker") + cls.registry_name = urlparse(cfg.get_base_url()).netloc + + admin_user, admin_password = cfg.pulp_auth + cls.user_admin = {"username": admin_user, "password": admin_password} + + api_client = gen_container_client() + api_client.configuration.username = cls.user_admin["username"] + api_client.configuration.password = cls.user_admin["password"] + cls.pushrepository_api = RepositoriesContainerPushApi(api_client) + cls.distributions_api = DistributionsContainerApi(api_client) + cls.manifests_api = ContentManifestsApi(api_client) + cls.tags_api = ContentTagsApi(api_client) + + cls.manifest_a = f"{REGISTRY_V2_REPO_PULP}:manifest_a" + cls.manifest_b = f"{REGISTRY_V2_REPO_PULP}:manifest_b" + cls.manifest_c = f"{REGISTRY_V2_REPO_PULP}:manifest_c" + cls._pull(cls.manifest_a) + cls._pull(cls.manifest_b) + cls._pull(cls.manifest_c) + + # get default manifests' digests for the further comparison + manifest_a_digest = cls.registry.inspect(cls.manifest_a)[0]["Digest"] + manifest_b_digest = cls.registry.inspect(cls.manifest_b)[0]["Digest"] + manifest_c_digest = cls.registry.inspect(cls.manifest_c)[0]["Digest"] + cls.manifests_v2s2_digests = sorted( + [manifest_a_digest, manifest_b_digest, manifest_c_digest] + ) + + # create a new manifest list composed of the pulled manifest images + cls.image_v2s2_tag = "manifest_list" + cls.image_v2s2_path = f"{REGISTRY_V2_REPO_PULP}:{cls.image_v2s2_tag}" + cls.local_v2s2_url = f"{cls.registry_name}/foo:{cls.image_v2s2_tag}" + cls.registry._dispatch_command("manifest", "create", cls.image_v2s2_path) + cls.registry._dispatch_command("manifest", "add", cls.image_v2s2_path, cls.manifest_a) + cls.registry._dispatch_command("manifest", "add", cls.image_v2s2_path, cls.manifest_b) + cls.registry._dispatch_command("manifest", "add", cls.image_v2s2_path, cls.manifest_c) + + # get digests of manifests after converting images to the OCI format by reloading them + cls.registry._dispatch_command( + "save", cls.manifest_a, "--format", "oci-dir", "-o", "manifest_a.tar" + ) + cls.registry._dispatch_command( + "save", cls.manifest_b, "--format", "oci-dir", "-o", "manifest_b.tar" + ) + cls.registry._dispatch_command( + "save", cls.manifest_c, "--format", "oci-dir", "-o", "manifest_c.tar" + ) + + cls.registry._dispatch_command("load", "-q", "-i", "manifest_a.tar") + cls.registry._dispatch_command("load", "-q", "-i", "manifest_b.tar") + cls.registry._dispatch_command("load", "-q", "-i", "manifest_c.tar") + + manifest_a_digest = cls.registry.inspect("manifest_a.tar")[0]["Digest"] + manifest_b_digest = cls.registry.inspect("manifest_b.tar")[0]["Digest"] + manifest_c_digest = cls.registry.inspect("manifest_c.tar")[0]["Digest"] + cls.manifests_oci_digests = sorted( + [manifest_a_digest, manifest_b_digest, manifest_c_digest] + ) + + # create an empty manifest list + cls.empty_image_tag = "empty_manifest_list" + cls.empty_image_path = f"{REGISTRY_V2_REPO_PULP}:{cls.empty_image_tag}" + cls.empty_image_local_url = f"{cls.registry_name}/foo:{cls.empty_image_tag}" + cls.registry._dispatch_command("manifest", "create", cls.empty_image_path) + + @classmethod + def tearDownClass(cls): + """Clean up created images.""" + cls.registry._dispatch_command("manifest", "rm", cls.image_v2s2_path) + cls.registry._dispatch_command("manifest", "rm", cls.empty_image_path) + + cls.registry._dispatch_command("image", "rm", cls.manifest_a) + cls.registry._dispatch_command("image", "rm", cls.manifest_b) + cls.registry._dispatch_command("image", "rm", cls.manifest_c) + + cls.registry._dispatch_command("image", "rm", "localhost/manifest_a.tar") + cls.registry._dispatch_command("image", "rm", "localhost/manifest_b.tar") + cls.registry._dispatch_command("image", "rm", "localhost/manifest_c.tar") + + delete_orphans() + + def test_push_manifest_list_v2s2(self): + """Push the created manifest list in the v2s2 format.""" + self.registry.login( + "-u", self.user_admin["username"], "-p", self.user_admin["password"], self.registry_name + ) + self.registry._dispatch_command( + "manifest", + "push", + self.image_v2s2_path, + self.local_v2s2_url, + "--all", + "--format", + "v2s2", + ) + + # pushing the same manifest list two times should not fail + self.registry._dispatch_command( + "manifest", + "push", + self.image_v2s2_path, + self.local_v2s2_url, + "--all", + "--format", + "v2s2", + ) + + distribution = self.distributions_api.list(name="foo").results[0] + self.addCleanup(self.distributions_api.delete, distribution.pulp_href) + + repo_version = self.pushrepository_api.read(distribution.repository).latest_version_href + latest_tag = self.tags_api.list(repository_version_added=repo_version).results[0] + assert latest_tag.name == self.image_v2s2_tag + + manifest_list = self.manifests_api.read(latest_tag.tagged_manifest) + assert manifest_list.media_type == MEDIA_TYPE.MANIFEST_LIST + assert manifest_list.schema_version == 2 + + referenced_manifests_digests = sorted( + [ + self.manifests_api.read(manifest_href).digest + for manifest_href in manifest_list.listed_manifests + ] + ) + assert referenced_manifests_digests == self.manifests_v2s2_digests + + def test_push_manifest_list_oci(self): + """Push the created manifest list in the OCI format.""" + self.registry.login( + "-u", self.user_admin["username"], "-p", self.user_admin["password"], self.registry_name + ) + self.registry._dispatch_command( + "manifest", + "push", + self.image_v2s2_path, + self.local_v2s2_url, + "--all", + "--format", + "oci", + ) + + distribution = self.distributions_api.list(name="foo").results[0] + self.addCleanup(self.distributions_api.delete, distribution.pulp_href) + + repo_version = self.pushrepository_api.read(distribution.repository).latest_version_href + latest_tag = self.tags_api.list(repository_version_added=repo_version).results[0] + assert latest_tag.name == self.image_v2s2_tag + + manifest_list = self.manifests_api.read(latest_tag.tagged_manifest) + assert manifest_list.media_type == MEDIA_TYPE.INDEX_OCI + assert manifest_list.schema_version == 2 + + referenced_manifests_digests = sorted( + [ + self.manifests_api.read(manifest_href).digest + for manifest_href in manifest_list.listed_manifests + ] + ) + assert referenced_manifests_digests == self.manifests_oci_digests + + def test_push_empty_manifest_list(self): + """Push an empty manifest list to the registry.""" + self.registry.login( + "-u", self.user_admin["username"], "-p", self.user_admin["password"], self.registry_name + ) + self.registry._dispatch_command( + "manifest", "push", self.empty_image_path, self.empty_image_local_url + ) + + distribution = self.distributions_api.list(name="foo").results[0] + self.addCleanup(self.distributions_api.delete, distribution.pulp_href) + + repo_version = self.pushrepository_api.read(distribution.repository).latest_version_href + latest_tag = self.tags_api.list(repository_version_added=repo_version).results[0] + assert latest_tag.name == self.empty_image_tag + + manifest_list = self.manifests_api.read(latest_tag.tagged_manifest) + # empty manifest lists are being pushed in the v2s2 format by default + assert manifest_list.media_type == MEDIA_TYPE.MANIFEST_LIST + assert manifest_list.schema_version == 2 + assert manifest_list.listed_manifests == []