Skip to content

Commit

Permalink
Debug upload
Browse files Browse the repository at this point in the history
[noissue]
Required PR: pulp/pulpcore#2779
  • Loading branch information
ipanova committed May 30, 2022
1 parent 43f7987 commit 0ab7d30
Showing 1 changed file with 69 additions and 41 deletions.
110 changes: 69 additions & 41 deletions pulp_container/app/registry_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,13 +635,37 @@ def partial_update(self, request, path, pk=None):

upload = get_object_or_404(models.Upload, repository=repository, pk=pk)

chunk = ContentFile(chunk.read())
if range_header:= request.headers.get("Content-Range"):
RANGE_HEADER_REGEX = re.compile(r"^([0-9]+)-([0-9]+)$")
found = RANGE_HEADER_REGEX.match(range_header)
start_offset = int(found.group(1))
end_offset = int(found.group(2))
length = end_offset - start_offset + 1
else:
length = int(request.headers["Content-Length"])

with transaction.atomic():
if upload.size != start:
raise Exception

upload.append(chunk, upload.size)
upload.size += chunk.size
#if more chunks
if range_header:
upload.append(chunk, upload.size)
else:
# 1 chunk
with NamedTemporaryFile("ab") as temp_file:
temp_file.write(chunk.read())
temp_file.flush()

uploaded_file = PulpTemporaryUploadedFile.from_file(File(open(temp_file.name, "rb")))
try:
artifact = Artifact.init_and_validate(uploaded_file)
artifact.save()
except IntegrityError:
artifact = Artifact.objects.get(sha256=artifact.sha256)
artifact.touch()

upload.size += length
upload.save()

return UploadResponse(upload=upload, path=path, request=request, status=204)
Expand Down Expand Up @@ -684,54 +708,58 @@ def put(self, request, path, pk=None):

chunks = UploadChunk.objects.filter(upload=upload).order_by("offset")

with NamedTemporaryFile("ab") as temp_file:
for chunk in chunks:
temp_file.write(chunk.file.read())
chunk.file.close()
temp_file.flush()
if chunks.count():

uploaded_file = PulpTemporaryUploadedFile.from_file(File(open(temp_file.name, "rb")))
with NamedTemporaryFile("ab") as temp_file:
for chunk in chunks:
temp_file.write(chunk.file.read())
chunk.file.close()
temp_file.flush()

if uploaded_file.hashers["sha256"].hexdigest() == digest[len("sha256:") :]:
uploaded_file = PulpTemporaryUploadedFile.from_file(File(open(temp_file.name, "rb")))
if uploaded_file.hashers["sha256"].hexdigest() != digest[len("sha256:") :]:
raise Exception("The digest did not match")
try:
artifact = Artifact.init_and_validate(uploaded_file)
artifact.save()
except IntegrityError:
artifact = Artifact.objects.get(sha256=artifact.sha256)
artifact.touch()
try:
blob = models.Blob(digest=digest)
blob.save()
except IntegrityError:
blob = models.Blob.objects.get(digest=digest)
blob.touch()
try:
blob_artifact = ContentArtifact(
artifact=artifact, content=blob, relative_path=digest
)
blob_artifact.save()
except IntegrityError:
ca = ContentArtifact.objects.get(content=blob, relative_path=digest)
if not ca.artifact:
ca.artifact = artifact
ca.save(update_fields=["artifact"])
upload.delete()

dispatched_task = dispatch(
add_and_remove,
shared_resources=[f"upload:{pk}"],
exclusive_resources=[repository],
kwargs={
"repository_pk": str(repository.pk),
"add_content_units": [str(blob.pk)],
"remove_content_units": [],
},
else:
# what if no artifact, raise error
artifact = Artifact.objects.get(sha256=artifact.sha256)
artifact.touch()
try:
blob = models.Blob(digest=digest)
blob.save()
except IntegrityError:
blob = models.Blob.objects.get(digest=digest)
blob.touch()
try:
blob_artifact = ContentArtifact(
artifact=artifact, content=blob, relative_path=digest
)
blob_artifact.save()
except IntegrityError:
ca = ContentArtifact.objects.get(content=blob, relative_path=digest)
if not ca.artifact:
ca.artifact = artifact
ca.save(update_fields=["artifact"])
upload.delete()

if has_task_completed(dispatched_task):
return BlobResponse(blob, path, 201, request)
else:
raise Exception("The digest did not match")
dispatched_task = dispatch(
add_and_remove,
shared_resources=[f"upload:{pk}"],
exclusive_resources=[repository],
kwargs={
"repository_pk": str(repository.pk),
"add_content_units": [str(blob.pk)],
"remove_content_units": [],
},
)

if has_task_completed(dispatched_task):
return BlobResponse(blob, path, 201, request)


class RedirectsMixin:
Expand Down

0 comments on commit 0ab7d30

Please sign in to comment.