From aec1408ee7c2e759656b17be0ebcfd188d972fab Mon Sep 17 00:00:00 2001 From: Ina Panova Date: Wed, 25 May 2022 15:30:09 +0200 Subject: [PATCH] Improved image upload process. closes #797 Required PR: https://github.com/pulp/pulpcore/pull/2779 --- CHANGES/797.bugfix | 3 + pulp_container/app/exceptions.py | 19 +++ .../app/migrations/0032_upload_artifact.py | 20 +++ pulp_container/app/models.py | 4 + pulp_container/app/registry_api.py | 115 +++++++++++------- 5 files changed, 119 insertions(+), 42 deletions(-) create mode 100644 CHANGES/797.bugfix create mode 100644 pulp_container/app/migrations/0032_upload_artifact.py diff --git a/CHANGES/797.bugfix b/CHANGES/797.bugfix new file mode 100644 index 000000000..b1e91914f --- /dev/null +++ b/CHANGES/797.bugfix @@ -0,0 +1,3 @@ +Improved image upload process from podman/docker clients. +These clients send data as one big chunk hence we don't need to save it +as chunk but as an artifact directly. diff --git a/pulp_container/app/exceptions.py b/pulp_container/app/exceptions.py index 16431a925..d81449edd 100644 --- a/pulp_container/app/exceptions.py +++ b/pulp_container/app/exceptions.py @@ -120,3 +120,22 @@ def __init__(self, digest): ] } ) + + +class InvalidRequest(ParseError): + """An exception to render an HTTP 400 response.""" + + def __init__(self, message): + """Initialize the exception with the digest of a signed manifest.""" + message = message or "Invalid request." + super().__init__( + detail={ + "errors": [ + { + "code": "INVALID_REQUEST", + "message": message, + "detail": {}, + } + ] + } + ) diff --git a/pulp_container/app/migrations/0032_upload_artifact.py b/pulp_container/app/migrations/0032_upload_artifact.py new file mode 100644 index 000000000..10dacc151 --- /dev/null +++ b/pulp_container/app/migrations/0032_upload_artifact.py @@ -0,0 +1,20 @@ +# Generated by Django 3.2.13 on 2022-06-23 11:48 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0091_systemid'), + ('container', '0031_replace_charf_with_textf'), + ] + + operations = [ + migrations.AddField( + model_name='upload', + name='artifact', + field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='uploads', to='core.artifact'), + ), + ] diff --git a/pulp_container/app/models.py b/pulp_container/app/models.py index a4c5cfc43..907c19bdb 100644 --- a/pulp_container/app/models.py +++ b/pulp_container/app/models.py @@ -15,6 +15,7 @@ from pulpcore.plugin.download import DownloaderFactory from pulpcore.plugin.models import ( + Artifact, AutoAddObjPermsMixin, BaseModel, Content, @@ -633,3 +634,6 @@ class Upload(CoreUpload): """ repository = models.ForeignKey(Repository, related_name="uploads", on_delete=models.CASCADE) + artifact = models.ForeignKey( + Artifact, related_name="uploads", null=True, on_delete=models.SET_NULL + ) diff --git a/pulp_container/app/registry_api.py b/pulp_container/app/registry_api.py index 2054e8b7d..2685daf94 100644 --- a/pulp_container/app/registry_api.py +++ b/pulp_container/app/registry_api.py @@ -51,6 +51,7 @@ from pulp_container.app.authorization import AuthorizationService from pulp_container.app.cache import find_base_path_cached, RegistryApiCache from pulp_container.app.exceptions import ( + InvalidRequest, RepositoryNotFound, RepositoryInvalid, BlobNotFound, @@ -547,11 +548,13 @@ class BlobUploads(ContainerRegistryApiMixin, ViewSet): model = models.Upload queryset = models.Upload.objects.all() - content_range_pattern = re.compile(r"^(?P\d+)-(?P\d+)$") + content_range_pattern = re.compile(r"^([0-9]+)-([0-9]+)$") def create(self, request, path): """ Create a new upload. + + Note: We do not support monolithic upload. """ _, repository = self.get_dr_push(request, path, create=True) @@ -620,28 +623,47 @@ def partial_update(self, request, path, pk=None): Process a chunk that will be appended to an existing upload. """ _, repository = self.get_dr_push(request, path) + upload = get_object_or_404(models.Upload, repository=repository, pk=pk) chunk = request.META["wsgi.input"] - if "Content-Range" in request.headers or "digest" not in request.query_params: - whole = False - else: - whole = True + if range_header := request.headers.get("Content-Range"): + found = self.content_range_pattern.match(range_header) + if not found: + raise InvalidRequest(message="Invalid range header") + start = int(found.group(1)) + end = int(found.group(2)) + length = end - start + 1 - if whole: - start = 0 else: - content_range = request.META.get("HTTP_CONTENT_RANGE", "") - match = self.content_range_pattern.match(content_range) - start = 0 if not match else int(match.group("start")) - - upload = get_object_or_404(models.Upload, repository=repository, pk=pk) + length = int(request.headers["Content-Length"]) + start = 0 - chunk = ContentFile(chunk.read()) with transaction.atomic(): if upload.size != start: raise Exception - upload.append(chunk, upload.size) - upload.size += chunk.size + # if more chunks + if range_header: + chunk = ContentFile(chunk.read()) + upload.append(chunk, upload.size) + else: + # 1 chunk + # do not add to the upload, create artifact right away + with NamedTemporaryFile("ab") as temp_file: + temp_file.write(chunk.read()) + temp_file.flush() + + uploaded_file = PulpTemporaryUploadedFile.from_file( + File(open(temp_file.name, "rb")) + ) + try: + artifact = Artifact.init_and_validate(uploaded_file) + artifact.save() + except IntegrityError: + artifact = Artifact.objects.get(sha256=artifact.sha256) + artifact.touch() + upload.artifact = artifact + + upload.size += length upload.save() return UploadResponse(upload=upload, path=path, request=request, status=204) @@ -649,6 +671,8 @@ def partial_update(self, request, path, pk=None): def put(self, request, path, pk=None): """ Create a blob from uploaded chunks. + + Note: We do not support monolithic upload. """ _, repository = self.get_dr_push(request, path) @@ -682,23 +706,32 @@ def put(self, request, path, pk=None): task.delete() raise Exception(str(error)) - chunks = UploadChunk.objects.filter(upload=upload).order_by("offset") - - with NamedTemporaryFile("ab") as temp_file: - for chunk in chunks: - temp_file.write(chunk.file.read()) - chunk.file.close() - temp_file.flush() - - uploaded_file = PulpTemporaryUploadedFile.from_file(File(open(temp_file.name, "rb"))) - - if uploaded_file.hashers["sha256"].hexdigest() == digest[len("sha256:") :]: + if artifact := upload.artifact: + if artifact.sha256 != digest[len("sha256:") :]: + raise Exception("The digest did not match") + artifact.touch() + else: + chunks = UploadChunk.objects.filter(upload=upload).order_by("offset") + with NamedTemporaryFile("ab") as temp_file: + for chunk in chunks: + temp_file.write(chunk.file.read()) + chunk.file.close() + temp_file.flush() + + uploaded_file = PulpTemporaryUploadedFile.from_file( + File(open(temp_file.name, "rb")) + ) + if uploaded_file.hashers["sha256"].hexdigest() != digest[len("sha256:") :]: + upload.delete() + raise Exception("The digest did not match") try: artifact = Artifact.init_and_validate(uploaded_file) artifact.save() except IntegrityError: artifact = Artifact.objects.get(sha256=artifact.sha256) artifact.touch() + + with transaction.atomic(): try: blob = models.Blob(digest=digest) blob.save() @@ -715,23 +748,21 @@ def put(self, request, path, pk=None): if not ca.artifact: ca.artifact = artifact ca.save(update_fields=["artifact"]) - upload.delete() - - dispatched_task = dispatch( - add_and_remove, - shared_resources=[f"upload:{pk}"], - exclusive_resources=[repository], - kwargs={ - "repository_pk": str(repository.pk), - "add_content_units": [str(blob.pk)], - "remove_content_units": [], - }, - ) + upload.delete() - if has_task_completed(dispatched_task): - return BlobResponse(blob, path, 201, request) - else: - raise Exception("The digest did not match") + dispatched_task = dispatch( + add_and_remove, + shared_resources=[f"upload:{pk}"], + exclusive_resources=[repository], + kwargs={ + "repository_pk": str(repository.pk), + "add_content_units": [str(blob.pk)], + "remove_content_units": [], + }, + ) + + if has_task_completed(dispatched_task): + return BlobResponse(blob, path, 201, request) class RedirectsMixin: