Skip to content

Commit

Permalink
Improved image upload process.
Browse files Browse the repository at this point in the history
closes #797
Required PR: pulp/pulpcore#2779
  • Loading branch information
ipanova committed Jun 23, 2022
1 parent 601f31e commit 3875a6c
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 58 deletions.
3 changes: 3 additions & 0 deletions CHANGES/797.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Improved image upload process from podman/docker clients.
These clients send data as one big chunk hence we don't need to save it
as chunk but as an artifact directly.
20 changes: 20 additions & 0 deletions pulp_container/app/migrations/0032_upload_artifact.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Generated by Django 3.2.13 on 2022-06-23 10:35

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
('core', '0092_alter_upload_options'),
('container', '0031_replace_charf_with_textf'),
]

operations = [
migrations.AddField(
model_name='upload',
name='artifact',
field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.PROTECT, related_name='uploads', to='core.artifact'),
),
]
2 changes: 2 additions & 0 deletions pulp_container/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from pulpcore.plugin.download import DownloaderFactory
from pulpcore.plugin.models import (
Artifact,
AutoAddObjPermsMixin,
BaseModel,
Content,
Expand Down Expand Up @@ -633,3 +634,4 @@ class Upload(CoreUpload):
"""

repository = models.ForeignKey(Repository, related_name="uploads", on_delete=models.CASCADE)
artifact = models.ForeignKey(Artifact, related_name="uploads", null=True, on_delete=models.PROTECT)
140 changes: 82 additions & 58 deletions pulp_container/app/registry_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,11 +547,13 @@ class BlobUploads(ContainerRegistryApiMixin, ViewSet):
model = models.Upload
queryset = models.Upload.objects.all()

content_range_pattern = re.compile(r"^(?P<start>\d+)-(?P<end>\d+)$")
content_range_pattern = re.compile(r"^([0-9]+)-([0-9]+)$")

def create(self, request, path):
"""
Create a new upload.
Note: We do not support monolithic upload.
"""
_, repository = self.get_dr_push(request, path, create=True)

Expand Down Expand Up @@ -620,35 +622,56 @@ def partial_update(self, request, path, pk=None):
Process a chunk that will be appended to an existing upload.
"""
_, repository = self.get_dr_push(request, path)
upload = get_object_or_404(models.Upload, repository=repository, pk=pk)
chunk = request.META["wsgi.input"]
if "Content-Range" in request.headers or "digest" not in request.query_params:
whole = False
else:
whole = True
if range_header := request.headers.get("Content-Range"):
found = self.content_range_pattern.match(range_header)
start = int(found.group(1))
end = int(found.group(2))
length = end - start + 1

if whole:
start = 0
else:
content_range = request.META.get("HTTP_CONTENT_RANGE", "")
match = self.content_range_pattern.match(content_range)
start = 0 if not match else int(match.group("start"))

upload = get_object_or_404(models.Upload, repository=repository, pk=pk)
# is this header always present?
length = int(request.headers["Content-Length"])
start = 0

chunk = ContentFile(chunk.read())
with transaction.atomic():
if upload.size != start:
raise Exception

upload.append(chunk, upload.size)
upload.size += chunk.size
# if more chunks
if range_header:
chunk = ContentFile(chunk.read())
upload.append(chunk, upload.size)
else:
# 1 chunk
# do not add to the upload, create artifact right away
with NamedTemporaryFile("ab") as temp_file:
temp_file.write(chunk.read())
temp_file.flush()

uploaded_file = PulpTemporaryUploadedFile.from_file(
File(open(temp_file.name, "rb"))
)
try:
artifact = Artifact.init_and_validate(uploaded_file)
artifact.save()
except IntegrityError:
artifact = Artifact.objects.get(sha256=artifact.sha256)
artifact.touch()
upload.artifact = artifact
# can use artifact.size instead

upload.size += length
upload.save()

return UploadResponse(upload=upload, path=path, request=request, status=204)

def put(self, request, path, pk=None):
"""
Create a blob from uploaded chunks.
Note: We do not support monolithic upload.
"""
_, repository = self.get_dr_push(request, path)

Expand Down Expand Up @@ -682,56 +705,57 @@ def put(self, request, path, pk=None):
task.delete()
raise Exception(str(error))

chunks = UploadChunk.objects.filter(upload=upload).order_by("offset")

with NamedTemporaryFile("ab") as temp_file:
for chunk in chunks:
temp_file.write(chunk.file.read())
chunk.file.close()
temp_file.flush()

uploaded_file = PulpTemporaryUploadedFile.from_file(File(open(temp_file.name, "rb")))

if uploaded_file.hashers["sha256"].hexdigest() == digest[len("sha256:") :]:
if artifact := upload.artifact:
artifact.touch()
else:
chunks = UploadChunk.objects.filter(upload=upload).order_by("offset")
with NamedTemporaryFile("ab") as temp_file:
for chunk in chunks:
temp_file.write(chunk.file.read())
chunk.file.close()
temp_file.flush()

uploaded_file = PulpTemporaryUploadedFile.from_file(
File(open(temp_file.name, "rb"))
)
if uploaded_file.hashers["sha256"].hexdigest() != digest[len("sha256:") :]:
upload.delete()
raise Exception("The digest did not match")
try:
artifact = Artifact.init_and_validate(uploaded_file)
artifact.save()
except IntegrityError:
artifact = Artifact.objects.get(sha256=artifact.sha256)
artifact.touch()
try:
blob = models.Blob(digest=digest)
blob.save()
except IntegrityError:
blob = models.Blob.objects.get(digest=digest)
blob.touch()
try:
blob_artifact = ContentArtifact(
artifact=artifact, content=blob, relative_path=digest
)
blob_artifact.save()
except IntegrityError:
ca = ContentArtifact.objects.get(content=blob, relative_path=digest)
if not ca.artifact:
ca.artifact = artifact
ca.save(update_fields=["artifact"])
upload.delete()

dispatched_task = dispatch(
add_and_remove,
shared_resources=[f"upload:{pk}"],
exclusive_resources=[repository],
kwargs={
"repository_pk": str(repository.pk),
"add_content_units": [str(blob.pk)],
"remove_content_units": [],
},
)
try:
blob = models.Blob(digest=digest)
blob.save()
except IntegrityError:
blob = models.Blob.objects.get(digest=digest)
blob.touch()
try:
blob_artifact = ContentArtifact(artifact=artifact, content=blob, relative_path=digest)
blob_artifact.save()
except IntegrityError:
ca = ContentArtifact.objects.get(content=blob, relative_path=digest)
if not ca.artifact:
ca.artifact = artifact
ca.save(update_fields=["artifact"])
upload.delete()

if has_task_completed(dispatched_task):
return BlobResponse(blob, path, 201, request)
else:
raise Exception("The digest did not match")
dispatched_task = dispatch(
add_and_remove,
shared_resources=[f"upload:{pk}"],
exclusive_resources=[repository],
kwargs={
"repository_pk": str(repository.pk),
"add_content_units": [str(blob.pk)],
"remove_content_units": [],
},
)

if has_task_completed(dispatched_task):
return BlobResponse(blob, path, 201, request)


class RedirectsMixin:
Expand Down

0 comments on commit 3875a6c

Please sign in to comment.