Skip to content

Commit

Permalink
Improveed image upload process.
Browse files Browse the repository at this point in the history
closes pulp#797
Required PR: pulp/pulpcore#2779
  • Loading branch information
ipanova committed Jun 14, 2022
1 parent 222bd76 commit b2ee794
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 55 deletions.
3 changes: 3 additions & 0 deletions CHANGES/797.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Improved image upload process from podman/docker clients.
These client send data as one big chunk hence we don't need to save it
as chunk but as an artifact directly.
136 changes: 81 additions & 55 deletions pulp_container/app/registry_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from tempfile import NamedTemporaryFile

from django.core.files.storage import default_storage as storage
from django.core.files.base import ContentFile, File
from django.core.files.base import File
from django.db import IntegrityError, transaction
from django.shortcuts import get_object_or_404

Expand Down Expand Up @@ -547,11 +547,13 @@ class BlobUploads(ContainerRegistryApiMixin, ViewSet):
model = models.Upload
queryset = models.Upload.objects.all()

content_range_pattern = re.compile(r"^(?P<start>\d+)-(?P<end>\d+)$")
content_range_pattern = re.compile(r"^([0-9]+)-([0-9]+)$")

def create(self, request, path):
"""
Create a new upload.
Note: We do not support monolithic upload.
"""
_, repository = self.get_dr_push(request, path, create=True)

Expand Down Expand Up @@ -620,35 +622,56 @@ def partial_update(self, request, path, pk=None):
Process a chunk that will be appended to an existing upload.
"""
_, repository = self.get_dr_push(request, path)
upload = get_object_or_404(models.Upload, repository=repository, pk=pk)
chunk = request.META["wsgi.input"]
if "Content-Range" in request.headers or "digest" not in request.query_params:
whole = False
else:
whole = True
if range_header := request.headers.get("Content-Range"):
found = self.content_range_pattern.match(range_header)
start = int(found.group(1))
end = int(found.group(2))
length = end - start + 1

if whole:
start = 0
else:
content_range = request.META.get("HTTP_CONTENT_RANGE", "")
match = self.content_range_pattern.match(content_range)
start = 0 if not match else int(match.group("start"))

upload = get_object_or_404(models.Upload, repository=repository, pk=pk)
# is this header always present?
length = int(request.headers["Content-Length"])
start = 0

chunk = ContentFile(chunk.read())
with transaction.atomic():
if upload.size != start:
raise Exception

upload.append(chunk, upload.size)
upload.size += chunk.size
# if more chunks
if range_header:
upload.append(chunk, upload.size)
else:
# 1 chunk
# do not add to the upload, create artifact right away
import pydevd_pycharm
pydevd_pycharm.settrace('localhost', port=12345, stdoutToServer=True, stderrToServer=True)
with NamedTemporaryFile("ab") as temp_file:
temp_file.write(chunk.read())
temp_file.flush()

uploaded_file = PulpTemporaryUploadedFile.from_file(
File(open(temp_file.name, "rb"))
)
try:
artifact = Artifact.init_and_validate(uploaded_file)
artifact.save()
except IntegrityError:
artifact = Artifact.objects.get(sha256=artifact.sha256)
artifact.touch()
# can use artifact.size instead

upload.size += length
upload.save()

return UploadResponse(upload=upload, path=path, request=request, status=204)

def put(self, request, path, pk=None):
"""
Create a blob from uploaded chunks.
Note: We do not support monolithic upload.
"""
_, repository = self.get_dr_push(request, path)

Expand Down Expand Up @@ -684,54 +707,57 @@ def put(self, request, path, pk=None):

chunks = UploadChunk.objects.filter(upload=upload).order_by("offset")

with NamedTemporaryFile("ab") as temp_file:
for chunk in chunks:
temp_file.write(chunk.file.read())
chunk.file.close()
temp_file.flush()
if chunks.exists():

uploaded_file = PulpTemporaryUploadedFile.from_file(File(open(temp_file.name, "rb")))
with NamedTemporaryFile("ab") as temp_file:
for chunk in chunks:
temp_file.write(chunk.file.read())
chunk.file.close()
temp_file.flush()

if uploaded_file.hashers["sha256"].hexdigest() == digest[len("sha256:") :]:
uploaded_file = PulpTemporaryUploadedFile.from_file(
File(open(temp_file.name, "rb"))
)
if uploaded_file.hashers["sha256"].hexdigest() != digest[len("sha256:") :]:
raise Exception("The digest did not match")
try:
artifact = Artifact.init_and_validate(uploaded_file)
artifact.save()
except IntegrityError:
artifact = Artifact.objects.get(sha256=artifact.sha256)
artifact.touch()
try:
blob = models.Blob(digest=digest)
blob.save()
except IntegrityError:
blob = models.Blob.objects.get(digest=digest)
blob.touch()
try:
blob_artifact = ContentArtifact(
artifact=artifact, content=blob, relative_path=digest
)
blob_artifact.save()
except IntegrityError:
ca = ContentArtifact.objects.get(content=blob, relative_path=digest)
if not ca.artifact:
ca.artifact = artifact
ca.save(update_fields=["artifact"])
upload.delete()

dispatched_task = dispatch(
add_and_remove,
shared_resources=[f"upload:{pk}"],
exclusive_resources=[repository],
kwargs={
"repository_pk": str(repository.pk),
"add_content_units": [str(blob.pk)],
"remove_content_units": [],
},
)

if has_task_completed(dispatched_task):
return BlobResponse(blob, path, 201, request)
else:
raise Exception("The digest did not match")
artifact = Artifact.objects.get(sha256=digest[len("sha256:") :])
artifact.touch()
try:
blob = models.Blob(digest=digest)
blob.save()
except IntegrityError:
blob = models.Blob.objects.get(digest=digest)
blob.touch()
try:
blob_artifact = ContentArtifact(artifact=artifact, content=blob, relative_path=digest)
blob_artifact.save()
except IntegrityError:
ca = ContentArtifact.objects.get(content=blob, relative_path=digest)
if not ca.artifact:
ca.artifact = artifact
ca.save(update_fields=["artifact"])
upload.delete()

dispatched_task = dispatch(
add_and_remove,
shared_resources=[f"upload:{pk}"],
exclusive_resources=[repository],
kwargs={
"repository_pk": str(repository.pk),
"add_content_units": [str(blob.pk)],
"remove_content_units": [],
},
)

if has_task_completed(dispatched_task):
return BlobResponse(blob, path, 201, request)


class RedirectsMixin:
Expand Down

0 comments on commit b2ee794

Please sign in to comment.