From 2de3fafd10ed1c92f5a0a1bb0de01f3350f674bb Mon Sep 17 00:00:00 2001 From: Ina Panova Date: Wed, 25 May 2022 15:30:09 +0200 Subject: [PATCH] Improved image upload process. closes #797 Required PR: https://github.com/pulp/pulpcore/pull/2779 Required PR: https://github.com/pulp/pulpcore/pull/2842 --- CHANGES/797.bugfix | 3 + pulp_container/app/registry_api.py | 134 +++++++++++++++++------------ 2 files changed, 82 insertions(+), 55 deletions(-) create mode 100644 CHANGES/797.bugfix diff --git a/CHANGES/797.bugfix b/CHANGES/797.bugfix new file mode 100644 index 000000000..b1e91914f --- /dev/null +++ b/CHANGES/797.bugfix @@ -0,0 +1,3 @@ +Improved image upload process from podman/docker clients. +These clients send data as one big chunk hence we don't need to save it +as chunk but as an artifact directly. diff --git a/pulp_container/app/registry_api.py b/pulp_container/app/registry_api.py index 0bab722e2..39a1af05a 100644 --- a/pulp_container/app/registry_api.py +++ b/pulp_container/app/registry_api.py @@ -18,7 +18,7 @@ from tempfile import NamedTemporaryFile from django.core.files.storage import default_storage as storage -from django.core.files.base import ContentFile, File +from django.core.files.base import File from django.db import IntegrityError, transaction from django.shortcuts import get_object_or_404 @@ -547,11 +547,13 @@ class BlobUploads(ContainerRegistryApiMixin, ViewSet): model = models.Upload queryset = models.Upload.objects.all() - content_range_pattern = re.compile(r"^(?P\d+)-(?P\d+)$") + content_range_pattern = re.compile(r"^([0-9]+)-([0-9]+)$") def create(self, request, path): """ Create a new upload. + + Note: We do not support monolithic upload. """ _, repository = self.get_dr_push(request, path, create=True) @@ -620,28 +622,45 @@ def partial_update(self, request, path, pk=None): Process a chunk that will be appended to an existing upload. """ _, repository = self.get_dr_push(request, path) + upload = get_object_or_404(models.Upload, repository=repository, pk=pk) chunk = request.META["wsgi.input"] - if "Content-Range" in request.headers or "digest" not in request.query_params: - whole = False - else: - whole = True + if range_header := request.headers.get("Content-Range"): + found = self.content_range_pattern.match(range_header) + start = int(found.group(1)) + end = int(found.group(2)) + length = end - start + 1 - if whole: - start = 0 else: - content_range = request.META.get("HTTP_CONTENT_RANGE", "") - match = self.content_range_pattern.match(content_range) - start = 0 if not match else int(match.group("start")) - - upload = get_object_or_404(models.Upload, repository=repository, pk=pk) + # is this header always present? + length = int(request.headers["Content-Length"]) + start = 0 - chunk = ContentFile(chunk.read()) with transaction.atomic(): if upload.size != start: raise Exception - upload.append(chunk, upload.size) - upload.size += chunk.size + # if more chunks + if range_header: + upload.append(chunk, upload.size) + else: + # 1 chunk + # do not add to the upload, create artifact right away + with NamedTemporaryFile("ab") as temp_file: + temp_file.write(chunk.read()) + temp_file.flush() + + uploaded_file = PulpTemporaryUploadedFile.from_file( + File(open(temp_file.name, "rb")) + ) + try: + artifact = Artifact.init_and_validate(uploaded_file) + artifact.save() + except IntegrityError: + artifact = Artifact.objects.get(sha256=artifact.sha256) + artifact.touch() + # can use artifact.size instead + + upload.size += length upload.save() return UploadResponse(upload=upload, path=path, request=request, status=204) @@ -649,6 +668,8 @@ def partial_update(self, request, path, pk=None): def put(self, request, path, pk=None): """ Create a blob from uploaded chunks. + + Note: We do not support monolithic upload. """ _, repository = self.get_dr_push(request, path) @@ -684,54 +705,57 @@ def put(self, request, path, pk=None): chunks = UploadChunk.objects.filter(upload=upload).order_by("offset") - with NamedTemporaryFile("ab") as temp_file: - for chunk in chunks: - temp_file.write(chunk.file.read()) - chunk.file.close() - temp_file.flush() + if chunks.exists(): - uploaded_file = PulpTemporaryUploadedFile.from_file(File(open(temp_file.name, "rb"))) + with NamedTemporaryFile("ab") as temp_file: + for chunk in chunks: + temp_file.write(chunk.file.read()) + chunk.file.close() + temp_file.flush() - if uploaded_file.hashers["sha256"].hexdigest() == digest[len("sha256:") :]: + uploaded_file = PulpTemporaryUploadedFile.from_file( + File(open(temp_file.name, "rb")) + ) + if uploaded_file.hashers["sha256"].hexdigest() != digest[len("sha256:") :]: + raise Exception("The digest did not match") try: artifact = Artifact.init_and_validate(uploaded_file) artifact.save() except IntegrityError: artifact = Artifact.objects.get(sha256=artifact.sha256) artifact.touch() - try: - blob = models.Blob(digest=digest) - blob.save() - except IntegrityError: - blob = models.Blob.objects.get(digest=digest) - blob.touch() - try: - blob_artifact = ContentArtifact( - artifact=artifact, content=blob, relative_path=digest - ) - blob_artifact.save() - except IntegrityError: - ca = ContentArtifact.objects.get(content=blob, relative_path=digest) - if not ca.artifact: - ca.artifact = artifact - ca.save(update_fields=["artifact"]) - upload.delete() - - dispatched_task = dispatch( - add_and_remove, - shared_resources=[f"upload:{pk}"], - exclusive_resources=[repository], - kwargs={ - "repository_pk": str(repository.pk), - "add_content_units": [str(blob.pk)], - "remove_content_units": [], - }, - ) - - if has_task_completed(dispatched_task): - return BlobResponse(blob, path, 201, request) else: - raise Exception("The digest did not match") + artifact = Artifact.objects.get(sha256=digest[len("sha256:") :]) + artifact.touch() + try: + blob = models.Blob(digest=digest) + blob.save() + except IntegrityError: + blob = models.Blob.objects.get(digest=digest) + blob.touch() + try: + blob_artifact = ContentArtifact(artifact=artifact, content=blob, relative_path=digest) + blob_artifact.save() + except IntegrityError: + ca = ContentArtifact.objects.get(content=blob, relative_path=digest) + if not ca.artifact: + ca.artifact = artifact + ca.save(update_fields=["artifact"]) + upload.delete() + + dispatched_task = dispatch( + add_and_remove, + shared_resources=[f"upload:{pk}"], + exclusive_resources=[repository], + kwargs={ + "repository_pk": str(repository.pk), + "add_content_units": [str(blob.pk)], + "remove_content_units": [], + }, + ) + + if has_task_completed(dispatched_task): + return BlobResponse(blob, path, 201, request) class RedirectsMixin: