Skip to content

Commit

Permalink
Debug upload
Browse files Browse the repository at this point in the history
[noissue]
Required PR: pulp/pulpcore#2779
  • Loading branch information
ipanova committed May 30, 2022
1 parent 222bd76 commit b164c80
Showing 1 changed file with 75 additions and 53 deletions.
128 changes: 75 additions & 53 deletions pulp_container/app/registry_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from tempfile import NamedTemporaryFile

from django.core.files.storage import default_storage as storage
from django.core.files.base import ContentFile, File
from django.core.files.base import File
from django.db import IntegrityError, transaction
from django.shortcuts import get_object_or_404

Expand Down Expand Up @@ -547,11 +547,13 @@ class BlobUploads(ContainerRegistryApiMixin, ViewSet):
model = models.Upload
queryset = models.Upload.objects.all()

content_range_pattern = re.compile(r"^(?P<start>\d+)-(?P<end>\d+)$")
content_range_pattern = re.compile(r"^([0-9]+)-([0-9]+)$")

def create(self, request, path):
"""
Create a new upload.
Note: We do not support monolithic upload.
"""
_, repository = self.get_dr_push(request, path, create=True)

Expand Down Expand Up @@ -621,34 +623,51 @@ def partial_update(self, request, path, pk=None):
"""
_, repository = self.get_dr_push(request, path)
chunk = request.META["wsgi.input"]
if "Content-Range" in request.headers or "digest" not in request.query_params:
whole = False
else:
whole = True
if range_header := request.headers.get("Content-Range"):
found = self.content_range_pattern.match(range_header)
start = int(found.group(1))
end = int(found.group(2))
length = end - start + 1

if whole:
start = 0
else:
content_range = request.META.get("HTTP_CONTENT_RANGE", "")
match = self.content_range_pattern.match(content_range)
start = 0 if not match else int(match.group("start"))
length = int(request.headers["Content-Length"])
start = 0

upload = get_object_or_404(models.Upload, repository=repository, pk=pk)

chunk = ContentFile(chunk.read())
with transaction.atomic():
if upload.size != start:
raise Exception

upload.append(chunk, upload.size)
upload.size += chunk.size
# if more chunks
if range_header:
upload.append(chunk, upload.size)
else:
# 1 chunk
with NamedTemporaryFile("ab") as temp_file:
temp_file.write(chunk.read())
temp_file.flush()

uploaded_file = PulpTemporaryUploadedFile.from_file(
File(open(temp_file.name, "rb"))
)
try:
artifact = Artifact.init_and_validate(uploaded_file)
artifact.save()
except IntegrityError:
artifact = Artifact.objects.get(sha256=artifact.sha256)
artifact.touch()

upload.size += length
upload.save()

return UploadResponse(upload=upload, path=path, request=request, status=204)

def put(self, request, path, pk=None):
"""
Create a blob from uploaded chunks.
Note: We do not support monolithic upload.
"""
_, repository = self.get_dr_push(request, path)

Expand Down Expand Up @@ -684,54 +703,57 @@ def put(self, request, path, pk=None):

chunks = UploadChunk.objects.filter(upload=upload).order_by("offset")

with NamedTemporaryFile("ab") as temp_file:
for chunk in chunks:
temp_file.write(chunk.file.read())
chunk.file.close()
temp_file.flush()
if chunks.count():

uploaded_file = PulpTemporaryUploadedFile.from_file(File(open(temp_file.name, "rb")))
with NamedTemporaryFile("ab") as temp_file:
for chunk in chunks:
temp_file.write(chunk.file.read())
chunk.file.close()
temp_file.flush()

if uploaded_file.hashers["sha256"].hexdigest() == digest[len("sha256:") :]:
uploaded_file = PulpTemporaryUploadedFile.from_file(
File(open(temp_file.name, "rb"))
)
if uploaded_file.hashers["sha256"].hexdigest() != digest[len("sha256:") :]:
raise Exception("The digest did not match")
try:
artifact = Artifact.init_and_validate(uploaded_file)
artifact.save()
except IntegrityError:
artifact = Artifact.objects.get(sha256=artifact.sha256)
artifact.touch()
try:
blob = models.Blob(digest=digest)
blob.save()
except IntegrityError:
blob = models.Blob.objects.get(digest=digest)
blob.touch()
try:
blob_artifact = ContentArtifact(
artifact=artifact, content=blob, relative_path=digest
)
blob_artifact.save()
except IntegrityError:
ca = ContentArtifact.objects.get(content=blob, relative_path=digest)
if not ca.artifact:
ca.artifact = artifact
ca.save(update_fields=["artifact"])
upload.delete()

dispatched_task = dispatch(
add_and_remove,
shared_resources=[f"upload:{pk}"],
exclusive_resources=[repository],
kwargs={
"repository_pk": str(repository.pk),
"add_content_units": [str(blob.pk)],
"remove_content_units": [],
},
)

if has_task_completed(dispatched_task):
return BlobResponse(blob, path, 201, request)
else:
raise Exception("The digest did not match")
artifact = Artifact.objects.get(sha256=digest[len("sha256:") :])
artifact.touch()
try:
blob = models.Blob(digest=digest)
blob.save()
except IntegrityError:
blob = models.Blob.objects.get(digest=digest)
blob.touch()
try:
blob_artifact = ContentArtifact(artifact=artifact, content=blob, relative_path=digest)
blob_artifact.save()
except IntegrityError:
ca = ContentArtifact.objects.get(content=blob, relative_path=digest)
if not ca.artifact:
ca.artifact = artifact
ca.save(update_fields=["artifact"])
upload.delete()

dispatched_task = dispatch(
add_and_remove,
shared_resources=[f"upload:{pk}"],
exclusive_resources=[repository],
kwargs={
"repository_pk": str(repository.pk),
"add_content_units": [str(blob.pk)],
"remove_content_units": [],
},
)

if has_task_completed(dispatched_task):
return BlobResponse(blob, path, 201, request)


class RedirectsMixin:
Expand Down

0 comments on commit b164c80

Please sign in to comment.