Skip to content

Commit

Permalink
Add support for pushing manifest lists to the registry
Browse files Browse the repository at this point in the history
closes pulp#469
  • Loading branch information
lubosmj committed Feb 21, 2022
1 parent 5fba1f2 commit b8d087c
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 110 deletions.
1 change: 1 addition & 0 deletions CHANGES/469.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added support for pushing manifest lists via the Registry API.
6 changes: 3 additions & 3 deletions docs/workflows/push.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
Push content to a Repository
=============================

Users can push images to the repositories hosted by the Container Registry. Only the users who are
logged in to the registry are allowed to perform such an action. Find below a complete example of
pushing a tagged image.
Users can push manifests and manifest lists to the repositories hosted by the Container Registry.
Only the users who are logged in to the registry are allowed to perform such an action. Find below
a complete example of pushing a tagged image.

.. note::
Having disabled the token authentication, only users with staff privileges (i.e.,
Expand Down
249 changes: 142 additions & 107 deletions pulp_container/app/registry_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
RegistryPermission,
TokenPermission,
)
from pulp_container.app.utils import extract_data_from_signature
from pulp_container.app.utils import extract_data_from_signature, has_task_completed
from pulp_container.constants import (
EMPTY_BLOB,
SIGNATURE_API_EXTENSION_VERSION,
Expand Down Expand Up @@ -671,20 +671,8 @@ def put(self, request, path, pk=None):
},
)

# Wait a small amount of time
for dummy in range(3):
time.sleep(1)
task = Task.objects.get(pk=dispatched_task.pk)
if task.state == "completed":
task.delete()
return BlobResponse(blob, path, 201, request)
elif task.state in ["waiting", "running"]:
continue
else:
error = task.error
task.delete()
raise Exception(str(error))
raise Throttled()
if has_task_completed(dispatched_task):
return BlobResponse(blob, path, 201, request)
else:
raise Exception("The digest did not match")

Expand Down Expand Up @@ -788,7 +776,10 @@ def put(self, request, path, pk=None):
"""
Responds with the actual manifest
"""
_, repository = self.get_dr_push(request, path)
# when a user uploads a manifest list with zero listed manifests (no blobs were uploaded)
# and the specified repository has not been created yet, create the repository without
# raising an error
_, repository = self.get_dr_push(request, path, create=True)
# iterate over all the layers and create
chunk = request.META["wsgi.input"]
artifact = self.receive_artifact(chunk)
Expand All @@ -801,90 +792,147 @@ def put(self, request, path, pk=None):
if request.content_type not in (
models.MEDIA_TYPE.MANIFEST_V2,
models.MEDIA_TYPE.MANIFEST_OCI,
models.MEDIA_TYPE.MANIFEST_LIST,
models.MEDIA_TYPE.INDEX_OCI,
):
# we suport only v2 docker/oci schema upload
raise ManifestInvalid(digest=manifest_digest)
# both docker/oci format should contain config, digest, mediaType, size
config_layer = content_data.get("config")
try:
config_digest = config_layer.get("digest")
config_blob = models.Blob.objects.get(digest=config_digest)
except models.Blob.DoesNotExist:
raise BlobNotFound(digest=config_digest)
config_media_type = config_layer.get("mediaType")
if config_media_type not in (
models.MEDIA_TYPE.CONFIG_BLOB,
models.MEDIA_TYPE.CONFIG_BLOB_OCI,

if request.content_type in (
models.MEDIA_TYPE.MANIFEST_LIST,
models.MEDIA_TYPE.INDEX_OCI,
):
raise BlobInvalid(digest=config_blob.digest)
manifest_list = models.Manifest(
digest=manifest_digest,
schema_version=content_data["schemaVersion"],
media_type=content_data.get("mediaType", models.MEDIA_TYPE.INDEX_OCI),
)
try:
manifest_list.save()
except IntegrityError:
manifest_list = models.Manifest.objects.get(digest=manifest_list.digest)
manifest_list.touch()

manifest = models.Manifest(
digest=manifest_digest,
schema_version=2,
media_type=request.content_type,
config_blob=config_blob,
)
try:
manifest.save()
except IntegrityError:
manifest = models.Manifest.objects.get(digest=manifest.digest)
manifest.touch()
ca = ContentArtifact(artifact=artifact, content=manifest, relative_path=manifest.digest)
try:
ca.save()
except IntegrityError:
pass
# both docker/oci format should contain layers, digest, media_type, size
layers = content_data.get("layers")
blobs = {}
for layer in layers:
blobs[layer.get("digest")] = layer.get("mediaType")
blobs_qs = models.Blob.objects.filter(digest__in=blobs.keys())
thru = []
for blob in blobs_qs:
# ensure there are no foreign layers
blob_media_type = blobs[blob.digest]
if blob_media_type not in (
models.MEDIA_TYPE.REGULAR_BLOB,
models.MEDIA_TYPE.REGULAR_BLOB_OCI,
ca = ContentArtifact(
artifact=artifact, content=manifest_list, relative_path=manifest_list.digest
)
try:
ca.save()
except IntegrityError:
pass

manifests_to_list = []
for manifest_metadata in content_data.get("manifests"):
manifest = models.Manifest.objects.get(digest=manifest_metadata["digest"])

platform = manifest_metadata["platform"]
manifest_to_list = models.ManifestListManifest(
manifest_list=manifest,
image_manifest=manifest_list,
architecture=platform["architecture"],
os=platform["os"],
features=platform.get("features", ""),
variant=platform.get("variant", ""),
os_version=platform.get("os.version", ""),
os_features=platform.get("os.features", ""),
)
manifests_to_list.append(manifest_to_list)

models.ManifestListManifest.objects.bulk_create(
manifests_to_list, ignore_conflicts=True, batch_size=1000
)

tag = models.Tag(name=pk, tagged_manifest=manifest_list)
try:
tag.save()
except IntegrityError:
tag = models.Tag.objects.get(name=tag.name, tagged_manifest=manifest_list)
tag.touch()

tags_to_remove = models.Tag.objects.filter(
pk__in=repository.latest_version().content.all(), name=tag
).exclude(tagged_manifest=manifest_list)
dispatched_task = dispatch(
add_and_remove,
exclusive_resources=[repository],
kwargs={
"repository_pk": str(repository.pk),
"add_content_units": [str(tag.pk), str(manifest_list.pk)],
"remove_content_units": [str(pk) for pk in tags_to_remove.values_list("pk")],
},
)

if has_task_completed(dispatched_task):
return ManifestResponse(manifest_list, path, request, status=201)
else:
# both docker/oci format should contain config, digest, mediaType, size
config_layer = content_data.get("config")
try:
config_digest = config_layer.get("digest")
config_blob = models.Blob.objects.get(digest=config_digest)
except models.Blob.DoesNotExist:
raise BlobNotFound(digest=config_digest)
config_media_type = config_layer.get("mediaType")
if config_media_type not in (
models.MEDIA_TYPE.CONFIG_BLOB,
models.MEDIA_TYPE.CONFIG_BLOB_OCI,
):
raise BlobInvalid(digest=blob.digest)
thru.append(models.BlobManifest(manifest=manifest, manifest_blob=blob))
models.BlobManifest.objects.bulk_create(objs=thru, ignore_conflicts=True, batch_size=1000)
tag = models.Tag(name=pk, tagged_manifest=manifest)
try:
tag.save()
except IntegrityError:
tag = models.Tag.objects.get(name=tag.name, tagged_manifest=manifest)
tag.touch()
raise BlobInvalid(digest=config_blob.digest)

tags_to_remove = models.Tag.objects.filter(
pk__in=repository.latest_version().content.all(), name=tag
).exclude(tagged_manifest=manifest)
dispatched_task = dispatch(
add_and_remove,
exclusive_resources=[repository],
kwargs={
"repository_pk": str(repository.pk),
"add_content_units": [str(tag.pk), str(manifest.pk)],
"remove_content_units": [str(pk) for pk in tags_to_remove.values_list("pk")],
},
)
manifest = models.Manifest(
digest=manifest_digest,
schema_version=2,
media_type=request.content_type,
config_blob=config_blob,
)
try:
manifest.save()
except IntegrityError:
manifest = models.Manifest.objects.get(digest=manifest.digest)
manifest.touch()
ca = ContentArtifact(artifact=artifact, content=manifest, relative_path=manifest.digest)
try:
ca.save()
except IntegrityError:
pass
# both docker/oci format should contain layers, digest, media_type, size
layers = content_data.get("layers")
blobs = {}
for layer in layers:
blobs[layer.get("digest")] = layer.get("mediaType")
blobs_qs = models.Blob.objects.filter(digest__in=blobs.keys())
thru = []
for blob in blobs_qs:
# ensure there are no foreign layers
blob_media_type = blobs[blob.digest]
if blob_media_type not in (
models.MEDIA_TYPE.REGULAR_BLOB,
models.MEDIA_TYPE.REGULAR_BLOB_OCI,
):
raise BlobInvalid(digest=blob.digest)
thru.append(models.BlobManifest(manifest=manifest, manifest_blob=blob))
models.BlobManifest.objects.bulk_create(objs=thru, ignore_conflicts=True, batch_size=1000)
tag = models.Tag(name=pk, tagged_manifest=manifest)
try:
tag.save()
except IntegrityError:
tag = models.Tag.objects.get(name=tag.name, tagged_manifest=manifest)
tag.touch()

# Wait a small amount of time
for dummy in range(3):
time.sleep(1)
task = Task.objects.get(pk=dispatched_task.pk)
if task.state == "completed":
task.delete()
tags_to_remove = models.Tag.objects.filter(
pk__in=repository.latest_version().content.all(), name=tag
).exclude(tagged_manifest=manifest)
dispatched_task = dispatch(
add_and_remove,
exclusive_resources=[repository],
kwargs={
"repository_pk": str(repository.pk),
"add_content_units": [str(tag.pk), str(manifest.pk)],
"remove_content_units": [str(pk) for pk in tags_to_remove.values_list("pk")],
},
)

if has_task_completed(dispatched_task):
return ManifestResponse(manifest, path, request, status=201)
elif task.state in ["waiting", "running"]:
continue
else:
error = task.error
task.delete()
raise Exception(str(error))
raise Throttled()

def receive_artifact(self, chunk):
"""Handles assembling of Manifest as it's being uploaded."""
Expand Down Expand Up @@ -1008,18 +1056,5 @@ def put(self, request, path, pk):
},
)

# wait a small amount of time until a new repository version
# with the new signature is created
for dummy in range(3):
time.sleep(1)
task = Task.objects.get(pk=dispatched_task.pk)
if task.state == "completed":
task.delete()
return ManifestSignatureResponse(signature, path)
elif task.state in ["waiting", "running"]:
continue
else:
error = task.error
task.delete()
raise Exception(str(error))
raise Throttled()
if has_task_completed(dispatched_task):
return ManifestSignatureResponse(signature, path)
31 changes: 31 additions & 0 deletions pulp_container/app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
import gnupg
import json
import logging
import time

from rest_framework.exceptions import Throttled

from pulpcore.plugin.models import Task

from pulp_container.constants import SIGNATURE_TYPE

Expand Down Expand Up @@ -99,3 +104,29 @@ def extract_data_from_signature(signature_raw, man_digest):
sig_json["signing_key_id"] = crypt_obj.key_id
sig_json["signature_timestamp"] = crypt_obj.timestamp
return sig_json


def has_task_completed(dispatched_task):
"""
Wait a couple of seconds until the task finishes its run.
Returns:
bool: True if the task ends successfully.
Raises:
Exception: If an error occurs during the task's runtime.
Throttled: If the task did not finish within a predefined timespan.
"""
for dummy in range(3):
time.sleep(1)
task = Task.objects.get(pk=dispatched_task.pk)
if task.state == "completed":
task.delete()
return True
elif task.state in ["waiting", "running"]:
continue
else:
error = task.error
task.delete()
raise Exception(str(error))
raise Throttled()
Loading

0 comments on commit b8d087c

Please sign in to comment.