Skip to content

Commit

Permalink
Add support for pushing manifest lists to the registry
Browse files Browse the repository at this point in the history
closes pulp#469
  • Loading branch information
lubosmj committed Mar 7, 2022
1 parent 2d3b895 commit 40641f7
Show file tree
Hide file tree
Showing 5 changed files with 371 additions and 111 deletions.
1 change: 1 addition & 0 deletions CHANGES/469.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added support for pushing manifest lists via the Registry API.
8 changes: 4 additions & 4 deletions docs/workflows/push.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
Push content to a Repository
=============================

Users can push images to the repositories hosted by the Container Registry. It is possible to push
images that container foreign ( non-distributable) layers. Only the users who are logged in to the
registry are allowed to perform push operation. Find below a complete example of pushing a tagged
image.
Users can push images (manifests and manifest lists) to repositories hosted by the Container
Registry. It is possible to push images that container foreign (non-distributable) layers. Only the
users who are logged in to the registry are allowed to perform push operation. Find below a complete
example of pushing a tagged image.

.. note::
Having disabled the token authentication, only users with staff privileges (i.e.,
Expand Down
243 changes: 136 additions & 107 deletions pulp_container/app/registry_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import hashlib
import re
from collections import namedtuple
import time

from urllib.parse import urlparse, urlunparse, parse_qs, urlencode
from tempfile import NamedTemporaryFile
Expand Down Expand Up @@ -69,7 +68,7 @@
RegistryPermission,
TokenPermission,
)
from pulp_container.app.utils import extract_data_from_signature
from pulp_container.app.utils import extract_data_from_signature, has_task_completed
from pulp_container.constants import (
EMPTY_BLOB,
SIGNATURE_API_EXTENSION_VERSION,
Expand Down Expand Up @@ -727,20 +726,8 @@ def put(self, request, path, pk=None):
},
)

# Wait a small amount of time
for dummy in range(3):
time.sleep(1)
task = Task.objects.get(pk=dispatched_task.pk)
if task.state == "completed":
task.delete()
return BlobResponse(blob, path, 201, request)
elif task.state in ["waiting", "running"]:
continue
else:
error = task.error
task.delete()
raise Exception(str(error))
raise Throttled()
if has_task_completed(dispatched_task):
return BlobResponse(blob, path, 201, request)
else:
raise Exception("The digest did not match")

Expand Down Expand Up @@ -844,83 +831,128 @@ def put(self, request, path, pk=None):
"""
Responds with the actual manifest
"""
_, repository = self.get_dr_push(request, path)
# when a user uploads a manifest list with zero listed manifests (no blobs were uploaded)
# and the specified repository has not been created yet, create the repository without
# raising an error
create_new_repo = request.content_type in (
models.MEDIA_TYPE.MANIFEST_LIST,
models.MEDIA_TYPE.INDEX_OCI,
)
distribution, repository = self.get_dr_push(request, path, create=create_new_repo)
# iterate over all the layers and create
chunk = request.META["wsgi.input"]
artifact = self.receive_artifact(chunk)
manifest_digest = "sha256:{id}".format(id=artifact.sha256)
with storage.open(artifact.file.name) as artifact_file:
raw_data = artifact_file.read()
content_data = json.loads(raw_data)

# oci format might not contain mediaType in the manifest.json, docker should
# hence need to check request content type
if request.content_type not in (
models.MEDIA_TYPE.MANIFEST_V2,
models.MEDIA_TYPE.MANIFEST_OCI,
models.MEDIA_TYPE.MANIFEST_LIST,
models.MEDIA_TYPE.INDEX_OCI,
):
# we suport only v2 docker/oci schema upload
raise ManifestInvalid(digest=manifest_digest)
# both docker/oci format should contain config, digest, mediaType, size
config_layer = content_data.get("config")
config_media_type = config_layer.get("mediaType")
config_digest = config_layer.get("digest")
if config_media_type not in (
models.MEDIA_TYPE.CONFIG_BLOB,
models.MEDIA_TYPE.CONFIG_BLOB_OCI,

with storage.open(artifact.file.name) as artifact_file:
raw_data = artifact_file.read()

content_data = json.loads(raw_data)

if request.content_type in (
models.MEDIA_TYPE.MANIFEST_LIST,
models.MEDIA_TYPE.INDEX_OCI,
):
raise BlobInvalid(digest=config_digest)
try:
config_blob = models.Blob.objects.get(digest=config_digest)
except models.Blob.DoesNotExist:
raise BlobInvalid(digest=config_digest)

# both docker/oci format should contain layers, digest, media_type, size
layers = content_data.get("layers")
blobs = set()
for layer in layers:
media_type = layer.get("mediaType")
urls = layer.get("urls")
digest = layer.get("digest")
if (
media_type
in (
models.MEDIA_TYPE.FOREIGN_BLOB,
models.MEDIA_TYPE.FOREIGN_BLOB_OCI,
manifest_list = self._save_manifest(artifact, manifest_digest, request)

manifests = {}
for manifest in content_data.get("manifests"):
manifests[manifest["digest"]] = manifest["platform"]

digests = set(manifests.keys())
found_manifests = models.Manifest.objects.filter(digest__in=digests)

found_digests = set(found_manifests.values_list("digest", flat=True))
digests_not_found = digests - found_digests
if digests_not_found:
ManifestNotFound(reference=digests_not_found)

manifests_to_list = []
for manifest in found_manifests:
platform = manifests[manifest.digest]
manifest_to_list = models.ManifestListManifest(
manifest_list=manifest,
image_manifest=manifest_list,
architecture=platform["architecture"],
os=platform["os"],
features=platform.get("features", ""),
variant=platform.get("variant", ""),
os_version=platform.get("os.version", ""),
os_features=platform.get("os.features", ""),
)
and not urls
):
raise ManifestInvalid(digest=manifest_digest)
if media_type not in (
models.MEDIA_TYPE.REGULAR_BLOB,
models.MEDIA_TYPE.REGULAR_BLOB_OCI,
manifests_to_list.append(manifest_to_list)

models.ManifestListManifest.objects.bulk_create(
manifests_to_list, ignore_conflicts=True, batch_size=1000
)
manifest = manifest_list
else:
# both docker/oci format should contain config, digest, mediaType, size
config_layer = content_data.get("config")
config_media_type = config_layer.get("mediaType")
config_digest = config_layer.get("digest")
if config_media_type not in (
models.MEDIA_TYPE.CONFIG_BLOB,
models.MEDIA_TYPE.CONFIG_BLOB_OCI,
):
raise BlobInvalid(digest=digest)
blobs.add(digest)
raise BlobInvalid(digest=config_digest)

try:
version = (
distribution.repository_version or distribution.repository.latest_version()
)
except AttributeError:
# the distribution does not contain reference to repository version or repository
raise RepositoryNotFound(name=path)

try:
config_blob = models.Blob.objects.get(digest=config_digest, pk__in=version.content)
except models.Blob.DoesNotExist:
raise BlobInvalid(digest=config_digest)

# both docker/oci format should contain layers, digest, media_type, size
layers = content_data.get("layers")
blobs = set()
for layer in layers:
media_type = layer.get("mediaType")
urls = layer.get("urls")
digest = layer.get("digest")
if (
media_type
in (
models.MEDIA_TYPE.FOREIGN_BLOB,
models.MEDIA_TYPE.FOREIGN_BLOB_OCI,
)
and not urls
):
raise ManifestInvalid(digest=manifest_digest)
if media_type not in (
models.MEDIA_TYPE.REGULAR_BLOB,
models.MEDIA_TYPE.REGULAR_BLOB_OCI,
):
raise BlobInvalid(digest=digest)
blobs.add(digest)

manifest = self._save_manifest(artifact, manifest_digest, request, config_blob)

blobs_qs = models.Blob.objects.filter(digest__in=blobs, pk__in=version.content)
thru = []
for blob in blobs_qs:
thru.append(models.BlobManifest(manifest=manifest, manifest_blob=blob))
models.BlobManifest.objects.bulk_create(
objs=thru, ignore_conflicts=True, batch_size=1000
)

manifest = models.Manifest(
digest=manifest_digest,
schema_version=2,
media_type=request.content_type,
config_blob=config_blob,
)
try:
manifest.save()
except IntegrityError:
manifest = models.Manifest.objects.get(digest=manifest.digest)
manifest.touch()
ca = ContentArtifact(artifact=artifact, content=manifest, relative_path=manifest.digest)
try:
ca.save()
except IntegrityError:
ca = ContentArtifact.objects.get(content=manifest, relative_path=manifest.digest)
if not ca.artifact:
ca.artifact = artifact
ca.save(update_fields=["artifact"])
blobs_qs = models.Blob.objects.filter(digest__in=blobs)
thru = []
for blob in blobs_qs:
thru.append(models.BlobManifest(manifest=manifest, manifest_blob=blob))
models.BlobManifest.objects.bulk_create(objs=thru, ignore_conflicts=True, batch_size=1000)
tag = models.Tag(name=pk, tagged_manifest=manifest)
try:
tag.save()
Expand All @@ -941,20 +973,30 @@ def put(self, request, path, pk=None):
},
)

# Wait a small amount of time
for dummy in range(3):
time.sleep(1)
task = Task.objects.get(pk=dispatched_task.pk)
if task.state == "completed":
task.delete()
return ManifestResponse(manifest, path, request, status=201)
elif task.state in ["waiting", "running"]:
continue
else:
error = task.error
task.delete()
raise Exception(str(error))
raise Throttled()
if has_task_completed(dispatched_task):
return ManifestResponse(manifest, path, request, status=201)

def _save_manifest(self, artifact, manifest_digest, request, config_blob=None):
manifest = models.Manifest(
digest=manifest_digest,
schema_version=2,
media_type=request.content_type,
config_blob=config_blob,
)
try:
manifest.save()
except IntegrityError:
manifest = models.Manifest.objects.get(digest=manifest.digest)
manifest.touch()
ca = ContentArtifact(artifact=artifact, content=manifest, relative_path=manifest.digest)
try:
ca.save()
except IntegrityError:
ca = ContentArtifact.objects.get(content=manifest, relative_path=manifest.digest)
if not ca.artifact:
ca.artifact = artifact
ca.save(update_fields=["artifact"])
return manifest

def receive_artifact(self, chunk):
"""Handles assembling of Manifest as it's being uploaded."""
Expand Down Expand Up @@ -1078,18 +1120,5 @@ def put(self, request, path, pk):
},
)

# wait a small amount of time until a new repository version
# with the new signature is created
for dummy in range(3):
time.sleep(1)
task = Task.objects.get(pk=dispatched_task.pk)
if task.state == "completed":
task.delete()
return ManifestSignatureResponse(signature, path)
elif task.state in ["waiting", "running"]:
continue
else:
error = task.error
task.delete()
raise Exception(str(error))
raise Throttled()
if has_task_completed(dispatched_task):
return ManifestSignatureResponse(signature, path)
32 changes: 32 additions & 0 deletions pulp_container/app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
import gnupg
import json
import logging
import time

from rest_framework.exceptions import Throttled

from pulpcore.plugin.models import Task

from pulp_container.constants import SIGNATURE_TYPE

Expand Down Expand Up @@ -99,3 +104,30 @@ def extract_data_from_signature(signature_raw, man_digest):
sig_json["signing_key_id"] = crypt_obj.key_id
sig_json["signature_timestamp"] = crypt_obj.timestamp
return sig_json


def has_task_completed(dispatched_task):
"""
Wait a couple of seconds until the task finishes its run.
Returns:
bool: True if the task ends successfully.
Raises:
Exception: If an error occurs during the task's runtime.
Throttled: If the task did not finish within a predefined timespan.
"""
for dummy in range(3):
time.sleep(1)
task = Task.objects.get(pk=dispatched_task.pk)
if task.state == "completed":
task.delete()
return True
elif task.state in ["waiting", "running"]:
continue
else:
error = task.error
task.delete()
raise Exception(str(error))
raise Throttled()
Loading

0 comments on commit 40641f7

Please sign in to comment.