diff --git a/CHANGES/797.bugfix b/CHANGES/797.bugfix new file mode 100644 index 000000000..b1e91914f --- /dev/null +++ b/CHANGES/797.bugfix @@ -0,0 +1,3 @@ +Improved image upload process from podman/docker clients. +These clients send data as one big chunk hence we don't need to save it +as chunk but as an artifact directly. diff --git a/pulp_container/app/migrations/0032_upload_artifact.py b/pulp_container/app/migrations/0032_upload_artifact.py new file mode 100644 index 000000000..9aeec0afd --- /dev/null +++ b/pulp_container/app/migrations/0032_upload_artifact.py @@ -0,0 +1,20 @@ +# Generated by Django 3.2.13 on 2022-06-23 10:35 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0092_alter_upload_options'), + ('container', '0031_replace_charf_with_textf'), + ] + + operations = [ + migrations.AddField( + model_name='upload', + name='artifact', + field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.PROTECT, related_name='uploads', to='core.artifact'), + ), + ] diff --git a/pulp_container/app/models.py b/pulp_container/app/models.py index a4c5cfc43..4c108e7c0 100644 --- a/pulp_container/app/models.py +++ b/pulp_container/app/models.py @@ -15,6 +15,7 @@ from pulpcore.plugin.download import DownloaderFactory from pulpcore.plugin.models import ( + Artifact, AutoAddObjPermsMixin, BaseModel, Content, @@ -633,3 +634,4 @@ class Upload(CoreUpload): """ repository = models.ForeignKey(Repository, related_name="uploads", on_delete=models.CASCADE) + artifact = models.ForeignKey(Artifact, related_name="uploads", null=True, on_delete=models.PROTECT) diff --git a/pulp_container/app/registry_api.py b/pulp_container/app/registry_api.py index 0bab722e2..067e4d05f 100644 --- a/pulp_container/app/registry_api.py +++ b/pulp_container/app/registry_api.py @@ -547,11 +547,13 @@ class BlobUploads(ContainerRegistryApiMixin, ViewSet): model = models.Upload queryset = models.Upload.objects.all() - content_range_pattern = re.compile(r"^(?P\d+)-(?P\d+)$") + content_range_pattern = re.compile(r"^([0-9]+)-([0-9]+)$") def create(self, request, path): """ Create a new upload. + + Note: We do not support monolithic upload. """ _, repository = self.get_dr_push(request, path, create=True) @@ -620,28 +622,47 @@ def partial_update(self, request, path, pk=None): Process a chunk that will be appended to an existing upload. """ _, repository = self.get_dr_push(request, path) + upload = get_object_or_404(models.Upload, repository=repository, pk=pk) chunk = request.META["wsgi.input"] - if "Content-Range" in request.headers or "digest" not in request.query_params: - whole = False - else: - whole = True + if range_header := request.headers.get("Content-Range"): + found = self.content_range_pattern.match(range_header) + start = int(found.group(1)) + end = int(found.group(2)) + length = end - start + 1 - if whole: - start = 0 else: - content_range = request.META.get("HTTP_CONTENT_RANGE", "") - match = self.content_range_pattern.match(content_range) - start = 0 if not match else int(match.group("start")) - - upload = get_object_or_404(models.Upload, repository=repository, pk=pk) + # is this header always present? + length = int(request.headers["Content-Length"]) + start = 0 - chunk = ContentFile(chunk.read()) with transaction.atomic(): if upload.size != start: raise Exception - upload.append(chunk, upload.size) - upload.size += chunk.size + # if more chunks + if range_header: + chunk = ContentFile(chunk.read()) + upload.append(chunk, upload.size) + else: + # 1 chunk + # do not add to the upload, create artifact right away + with NamedTemporaryFile("ab") as temp_file: + temp_file.write(chunk.read()) + temp_file.flush() + + uploaded_file = PulpTemporaryUploadedFile.from_file( + File(open(temp_file.name, "rb")) + ) + try: + artifact = Artifact.init_and_validate(uploaded_file) + artifact.save() + except IntegrityError: + artifact = Artifact.objects.get(sha256=artifact.sha256) + artifact.touch() + upload.artifact = artifact + # can use artifact.size instead + + upload.size += length upload.save() return UploadResponse(upload=upload, path=path, request=request, status=204) @@ -649,6 +670,8 @@ def partial_update(self, request, path, pk=None): def put(self, request, path, pk=None): """ Create a blob from uploaded chunks. + + Note: We do not support monolithic upload. """ _, repository = self.get_dr_push(request, path) @@ -682,56 +705,57 @@ def put(self, request, path, pk=None): task.delete() raise Exception(str(error)) - chunks = UploadChunk.objects.filter(upload=upload).order_by("offset") - - with NamedTemporaryFile("ab") as temp_file: - for chunk in chunks: - temp_file.write(chunk.file.read()) - chunk.file.close() - temp_file.flush() - - uploaded_file = PulpTemporaryUploadedFile.from_file(File(open(temp_file.name, "rb"))) - - if uploaded_file.hashers["sha256"].hexdigest() == digest[len("sha256:") :]: + if artifact := upload.artifact: + artifact.touch() + else: + chunks = UploadChunk.objects.filter(upload=upload).order_by("offset") + with NamedTemporaryFile("ab") as temp_file: + for chunk in chunks: + temp_file.write(chunk.file.read()) + chunk.file.close() + temp_file.flush() + + uploaded_file = PulpTemporaryUploadedFile.from_file( + File(open(temp_file.name, "rb")) + ) + if uploaded_file.hashers["sha256"].hexdigest() != digest[len("sha256:") :]: + upload.delete() + raise Exception("The digest did not match") try: artifact = Artifact.init_and_validate(uploaded_file) artifact.save() except IntegrityError: artifact = Artifact.objects.get(sha256=artifact.sha256) artifact.touch() - try: - blob = models.Blob(digest=digest) - blob.save() - except IntegrityError: - blob = models.Blob.objects.get(digest=digest) - blob.touch() - try: - blob_artifact = ContentArtifact( - artifact=artifact, content=blob, relative_path=digest - ) - blob_artifact.save() - except IntegrityError: - ca = ContentArtifact.objects.get(content=blob, relative_path=digest) - if not ca.artifact: - ca.artifact = artifact - ca.save(update_fields=["artifact"]) - upload.delete() - - dispatched_task = dispatch( - add_and_remove, - shared_resources=[f"upload:{pk}"], - exclusive_resources=[repository], - kwargs={ - "repository_pk": str(repository.pk), - "add_content_units": [str(blob.pk)], - "remove_content_units": [], - }, - ) + try: + blob = models.Blob(digest=digest) + blob.save() + except IntegrityError: + blob = models.Blob.objects.get(digest=digest) + blob.touch() + try: + blob_artifact = ContentArtifact(artifact=artifact, content=blob, relative_path=digest) + blob_artifact.save() + except IntegrityError: + ca = ContentArtifact.objects.get(content=blob, relative_path=digest) + if not ca.artifact: + ca.artifact = artifact + ca.save(update_fields=["artifact"]) + upload.delete() - if has_task_completed(dispatched_task): - return BlobResponse(blob, path, 201, request) - else: - raise Exception("The digest did not match") + dispatched_task = dispatch( + add_and_remove, + shared_resources=[f"upload:{pk}"], + exclusive_resources=[repository], + kwargs={ + "repository_pk": str(repository.pk), + "add_content_units": [str(blob.pk)], + "remove_content_units": [], + }, + ) + + if has_task_completed(dispatched_task): + return BlobResponse(blob, path, 201, request) class RedirectsMixin: