Skip to content

Commit

Permalink
Add support for pushing manifest lists to the registry
Browse files Browse the repository at this point in the history
closes pulp#469
  • Loading branch information
lubosmj committed Mar 10, 2022
1 parent 2d3b895 commit 1843a04
Show file tree
Hide file tree
Showing 5 changed files with 371 additions and 110 deletions.
1 change: 1 addition & 0 deletions CHANGES/469.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added support for pushing manifest lists via the Registry API.
8 changes: 4 additions & 4 deletions docs/workflows/push.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
Push content to a Repository
=============================

Users can push images to the repositories hosted by the Container Registry. It is possible to push
images that container foreign ( non-distributable) layers. Only the users who are logged in to the
registry are allowed to perform push operation. Find below a complete example of pushing a tagged
image.
Users can push images (manifests and manifest lists) to repositories hosted by the Container
Registry. It is possible to push images that container foreign (non-distributable) layers. Only the
users who are logged in to the registry are allowed to perform push operation. Find below a complete
example of pushing a tagged image.

.. note::
Having disabled the token authentication, only users with staff privileges (i.e.,
Expand Down
242 changes: 136 additions & 106 deletions pulp_container/app/registry_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
import logging
import hashlib
import re
from collections import namedtuple
import time

from collections import namedtuple

from urllib.parse import urlparse, urlunparse, parse_qs, urlencode
from tempfile import NamedTemporaryFile

Expand Down Expand Up @@ -69,7 +70,7 @@
RegistryPermission,
TokenPermission,
)
from pulp_container.app.utils import extract_data_from_signature
from pulp_container.app.utils import extract_data_from_signature, has_task_completed
from pulp_container.constants import (
EMPTY_BLOB,
SIGNATURE_API_EXTENSION_VERSION,
Expand Down Expand Up @@ -727,20 +728,8 @@ def put(self, request, path, pk=None):
},
)

# Wait a small amount of time
for dummy in range(3):
time.sleep(1)
task = Task.objects.get(pk=dispatched_task.pk)
if task.state == "completed":
task.delete()
return BlobResponse(blob, path, 201, request)
elif task.state in ["waiting", "running"]:
continue
else:
error = task.error
task.delete()
raise Exception(str(error))
raise Throttled()
if has_task_completed(dispatched_task):
return BlobResponse(blob, path, 201, request)
else:
raise Exception("The digest did not match")

Expand Down Expand Up @@ -844,83 +833,127 @@ def put(self, request, path, pk=None):
"""
Responds with the actual manifest
"""
_, repository = self.get_dr_push(request, path)
# when a user uploads a manifest list with zero listed manifests (no blobs were uploaded
# before) and the specified repository has not been created yet, create the repository
# without raising an error
create_new_repo = request.content_type in (
models.MEDIA_TYPE.MANIFEST_LIST,
models.MEDIA_TYPE.INDEX_OCI,
)
_, repository = self.get_dr_push(request, path, create=create_new_repo)
# iterate over all the layers and create
chunk = request.META["wsgi.input"]
artifact = self.receive_artifact(chunk)
manifest_digest = "sha256:{id}".format(id=artifact.sha256)
with storage.open(artifact.file.name) as artifact_file:
raw_data = artifact_file.read()
content_data = json.loads(raw_data)

# oci format might not contain mediaType in the manifest.json, docker should
# hence need to check request content type
if request.content_type not in (
models.MEDIA_TYPE.MANIFEST_V2,
models.MEDIA_TYPE.MANIFEST_OCI,
models.MEDIA_TYPE.MANIFEST_LIST,
models.MEDIA_TYPE.INDEX_OCI,
):
# we suport only v2 docker/oci schema upload
raise ManifestInvalid(digest=manifest_digest)
# both docker/oci format should contain config, digest, mediaType, size
config_layer = content_data.get("config")
config_media_type = config_layer.get("mediaType")
config_digest = config_layer.get("digest")
if config_media_type not in (
models.MEDIA_TYPE.CONFIG_BLOB,
models.MEDIA_TYPE.CONFIG_BLOB_OCI,

with storage.open(artifact.file.name) as artifact_file:
raw_data = artifact_file.read()

content_data = json.loads(raw_data)

if request.content_type in (
models.MEDIA_TYPE.MANIFEST_LIST,
models.MEDIA_TYPE.INDEX_OCI,
):
raise BlobInvalid(digest=config_digest)
try:
config_blob = models.Blob.objects.get(digest=config_digest)
except models.Blob.DoesNotExist:
raise BlobInvalid(digest=config_digest)

# both docker/oci format should contain layers, digest, media_type, size
layers = content_data.get("layers")
blobs = set()
for layer in layers:
media_type = layer.get("mediaType")
urls = layer.get("urls")
digest = layer.get("digest")
if (
media_type
in (
models.MEDIA_TYPE.FOREIGN_BLOB,
models.MEDIA_TYPE.FOREIGN_BLOB_OCI,
manifests = {}
for manifest in content_data.get("manifests"):
manifests[manifest["digest"]] = manifest["platform"]

digests = set(manifests.keys())
found_manifests = models.Manifest.objects.filter(digest__in=digests)

if (len(manifests) - found_manifests.count()) != 0:
ManifestInvalid(digest=manifest_digest)

manifest_list = self._save_manifest(artifact, manifest_digest, request.content_type)

manifests_to_list = []
for manifest in found_manifests:
platform = manifests[manifest.digest]
manifest_to_list = models.ManifestListManifest(
manifest_list=manifest,
image_manifest=manifest_list,
architecture=platform["architecture"],
os=platform["os"],
features=platform.get("features", ""),
variant=platform.get("variant", ""),
os_version=platform.get("os.version", ""),
os_features=platform.get("os.features", ""),
)
and not urls
manifests_to_list.append(manifest_to_list)

models.ManifestListManifest.objects.bulk_create(
manifests_to_list, ignore_conflicts=True, batch_size=1000
)
manifest = manifest_list
else:
# both docker/oci format should contain config, digest, mediaType, size
config_layer = content_data.get("config")
config_media_type = config_layer.get("mediaType")
config_digest = config_layer.get("digest")
if config_media_type not in (
models.MEDIA_TYPE.CONFIG_BLOB,
models.MEDIA_TYPE.CONFIG_BLOB_OCI,
):
raise BlobInvalid(digest=config_digest)

try:
config_blob = models.Blob.objects.get(
digest=config_digest, pk__in=repository.latest_version().content
)
except models.Blob.DoesNotExist:
raise BlobInvalid(digest=config_digest)

# both docker/oci format should contain layers, digest, media_type, size
layers = content_data.get("layers")
blobs = set()
for layer in layers:
media_type = layer.get("mediaType")
urls = layer.get("urls")
digest = layer.get("digest")
if (
media_type
in (
models.MEDIA_TYPE.FOREIGN_BLOB,
models.MEDIA_TYPE.FOREIGN_BLOB_OCI,
)
and not urls
):
raise ManifestInvalid(digest=manifest_digest)
if media_type not in (
models.MEDIA_TYPE.REGULAR_BLOB,
models.MEDIA_TYPE.REGULAR_BLOB_OCI,
):
raise BlobInvalid(digest=digest)
blobs.add(digest)

blobs_qs = models.Blob.objects.filter(
digest__in=blobs, pk__in=repository.latest_version().content
)
if (len(blobs) - blobs_qs.count()) != 0:
raise ManifestInvalid(digest=manifest_digest)
if media_type not in (
models.MEDIA_TYPE.REGULAR_BLOB,
models.MEDIA_TYPE.REGULAR_BLOB_OCI,
):
raise BlobInvalid(digest=digest)
blobs.add(digest)

manifest = models.Manifest(
digest=manifest_digest,
schema_version=2,
media_type=request.content_type,
config_blob=config_blob,
)
try:
manifest.save()
except IntegrityError:
manifest = models.Manifest.objects.get(digest=manifest.digest)
manifest.touch()
ca = ContentArtifact(artifact=artifact, content=manifest, relative_path=manifest.digest)
try:
ca.save()
except IntegrityError:
ca = ContentArtifact.objects.get(content=manifest, relative_path=manifest.digest)
if not ca.artifact:
ca.artifact = artifact
ca.save(update_fields=["artifact"])
blobs_qs = models.Blob.objects.filter(digest__in=blobs)
thru = []
for blob in blobs_qs:
thru.append(models.BlobManifest(manifest=manifest, manifest_blob=blob))
models.BlobManifest.objects.bulk_create(objs=thru, ignore_conflicts=True, batch_size=1000)
manifest = self._save_manifest(
artifact, manifest_digest, request.content_type, config_blob
)

thru = []
for blob in blobs_qs:
thru.append(models.BlobManifest(manifest=manifest, manifest_blob=blob))
models.BlobManifest.objects.bulk_create(
objs=thru, ignore_conflicts=True, batch_size=1000
)

tag = models.Tag(name=pk, tagged_manifest=manifest)
try:
tag.save()
Expand All @@ -941,20 +974,30 @@ def put(self, request, path, pk=None):
},
)

# Wait a small amount of time
for dummy in range(3):
time.sleep(1)
task = Task.objects.get(pk=dispatched_task.pk)
if task.state == "completed":
task.delete()
return ManifestResponse(manifest, path, request, status=201)
elif task.state in ["waiting", "running"]:
continue
else:
error = task.error
task.delete()
raise Exception(str(error))
raise Throttled()
if has_task_completed(dispatched_task):
return ManifestResponse(manifest, path, request, status=201)

def _save_manifest(self, artifact, manifest_digest, content_type, config_blob=None):
manifest = models.Manifest(
digest=manifest_digest,
schema_version=2,
media_type=content_type,
config_blob=config_blob,
)
try:
manifest.save()
except IntegrityError:
manifest = models.Manifest.objects.get(digest=manifest.digest)
manifest.touch()
ca = ContentArtifact(artifact=artifact, content=manifest, relative_path=manifest.digest)
try:
ca.save()
except IntegrityError:
ca = ContentArtifact.objects.get(content=manifest, relative_path=manifest.digest)
if not ca.artifact:
ca.artifact = artifact
ca.save(update_fields=["artifact"])
return manifest

def receive_artifact(self, chunk):
"""Handles assembling of Manifest as it's being uploaded."""
Expand Down Expand Up @@ -1078,18 +1121,5 @@ def put(self, request, path, pk):
},
)

# wait a small amount of time until a new repository version
# with the new signature is created
for dummy in range(3):
time.sleep(1)
task = Task.objects.get(pk=dispatched_task.pk)
if task.state == "completed":
task.delete()
return ManifestSignatureResponse(signature, path)
elif task.state in ["waiting", "running"]:
continue
else:
error = task.error
task.delete()
raise Exception(str(error))
raise Throttled()
if has_task_completed(dispatched_task):
return ManifestSignatureResponse(signature, path)
32 changes: 32 additions & 0 deletions pulp_container/app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
import gnupg
import json
import logging
import time

from rest_framework.exceptions import Throttled

from pulpcore.plugin.models import Task

from pulp_container.constants import SIGNATURE_TYPE

Expand Down Expand Up @@ -99,3 +104,30 @@ def extract_data_from_signature(signature_raw, man_digest):
sig_json["signing_key_id"] = crypt_obj.key_id
sig_json["signature_timestamp"] = crypt_obj.timestamp
return sig_json


def has_task_completed(dispatched_task):
"""
Wait a couple of seconds until the task finishes its run.
Returns:
bool: True if the task ends successfully.
Raises:
Exception: If an error occurs during the task's runtime.
Throttled: If the task did not finish within a predefined timespan.
"""
for dummy in range(3):
time.sleep(1)
task = Task.objects.get(pk=dispatched_task.pk)
if task.state == "completed":
task.delete()
return True
elif task.state in ["waiting", "running"]:
continue
else:
error = task.error
task.delete()
raise Exception(str(error))
raise Throttled()
Loading

0 comments on commit 1843a04

Please sign in to comment.