Skip to content

Commit

Permalink
Improved image upload process.
Browse files Browse the repository at this point in the history
closes #797
Required PR: pulp/pulpcore#2779
  • Loading branch information
ipanova committed Jun 23, 2022
1 parent 5bcf381 commit 3ca9027
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 42 deletions.
3 changes: 3 additions & 0 deletions CHANGES/797.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Improved image upload process from podman/docker clients.
These clients send data as one big chunk hence we don't need to save it
as chunk but as an artifact directly.
19 changes: 19 additions & 0 deletions pulp_container/app/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,22 @@ def __init__(self, digest):
]
}
)


class InvalidRequest(ParseError):
"""An exception to render an HTTP 400 response."""

def __init__(self, message):
"""Initialize the exception with the digest of a signed manifest."""
message = message or "Invalid request."
super().__init__(
detail={
"errors": [
{
"code": "INVALID_REQUEST",
"message": message,
"detail": {},
}
]
}
)
20 changes: 20 additions & 0 deletions pulp_container/app/migrations/0032_upload_artifact.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Generated by Django 3.2.13 on 2022-06-23 11:48

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
('core', '0092_alter_upload_options'),
('container', '0031_replace_charf_with_textf'),
]

operations = [
migrations.AddField(
model_name='upload',
name='artifact',
field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='uploads', to='core.artifact'),
),
]
4 changes: 4 additions & 0 deletions pulp_container/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from pulpcore.plugin.download import DownloaderFactory
from pulpcore.plugin.models import (
Artifact,
AutoAddObjPermsMixin,
BaseModel,
Content,
Expand Down Expand Up @@ -633,3 +634,6 @@ class Upload(CoreUpload):
"""

repository = models.ForeignKey(Repository, related_name="uploads", on_delete=models.CASCADE)
artifact = models.ForeignKey(
Artifact, related_name="uploads", null=True, on_delete=models.SET_NULL
)
113 changes: 71 additions & 42 deletions pulp_container/app/registry_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from pulp_container.app.authorization import AuthorizationService
from pulp_container.app.cache import find_base_path_cached, RegistryApiCache
from pulp_container.app.exceptions import (
InvalidRequest,
RepositoryNotFound,
RepositoryInvalid,
BlobNotFound,
Expand Down Expand Up @@ -547,11 +548,13 @@ class BlobUploads(ContainerRegistryApiMixin, ViewSet):
model = models.Upload
queryset = models.Upload.objects.all()

content_range_pattern = re.compile(r"^(?P<start>\d+)-(?P<end>\d+)$")
content_range_pattern = re.compile(r"^([0-9]+)-([0-9]+)$")

def create(self, request, path):
"""
Create a new upload.
Note: We do not support monolithic upload.
"""
_, repository = self.get_dr_push(request, path, create=True)

Expand Down Expand Up @@ -620,35 +623,56 @@ def partial_update(self, request, path, pk=None):
Process a chunk that will be appended to an existing upload.
"""
_, repository = self.get_dr_push(request, path)
upload = get_object_or_404(models.Upload, repository=repository, pk=pk)
chunk = request.META["wsgi.input"]
if "Content-Range" in request.headers or "digest" not in request.query_params:
whole = False
else:
whole = True
if range_header := request.headers.get("Content-Range"):
found = self.content_range_pattern.match(range_header)
if not found:
raise InvalidRequest(message="Invalid range header")
start = int(found.group(1))
end = int(found.group(2))
length = end - start + 1

if whole:
start = 0
else:
content_range = request.META.get("HTTP_CONTENT_RANGE", "")
match = self.content_range_pattern.match(content_range)
start = 0 if not match else int(match.group("start"))

upload = get_object_or_404(models.Upload, repository=repository, pk=pk)
length = int(request.headers["Content-Length"])
start = 0

chunk = ContentFile(chunk.read())
with transaction.atomic():
if upload.size != start:
raise Exception

upload.append(chunk, upload.size)
upload.size += chunk.size
# if more chunks
if range_header:
chunk = ContentFile(chunk.read())
upload.append(chunk, upload.size)
else:
# 1 chunk
# do not add to the upload, create artifact right away
with NamedTemporaryFile("ab") as temp_file:
temp_file.write(chunk.read())
temp_file.flush()

uploaded_file = PulpTemporaryUploadedFile.from_file(
File(open(temp_file.name, "rb"))
)
try:
artifact = Artifact.init_and_validate(uploaded_file)
artifact.save()
except IntegrityError:
artifact = Artifact.objects.get(sha256=artifact.sha256)
artifact.touch()
upload.artifact = artifact

upload.size += length
upload.save()

return UploadResponse(upload=upload, path=path, request=request, status=204)

def put(self, request, path, pk=None):
"""
Create a blob from uploaded chunks.
Note: We do not support monolithic upload.
"""
_, repository = self.get_dr_push(request, path)

Expand Down Expand Up @@ -682,23 +706,30 @@ def put(self, request, path, pk=None):
task.delete()
raise Exception(str(error))

chunks = UploadChunk.objects.filter(upload=upload).order_by("offset")

with NamedTemporaryFile("ab") as temp_file:
for chunk in chunks:
temp_file.write(chunk.file.read())
chunk.file.close()
temp_file.flush()

uploaded_file = PulpTemporaryUploadedFile.from_file(File(open(temp_file.name, "rb")))

if uploaded_file.hashers["sha256"].hexdigest() == digest[len("sha256:") :]:
if artifact := upload.artifact:
artifact.touch()
else:
chunks = UploadChunk.objects.filter(upload=upload).order_by("offset")
with NamedTemporaryFile("ab") as temp_file:
for chunk in chunks:
temp_file.write(chunk.file.read())
chunk.file.close()
temp_file.flush()

uploaded_file = PulpTemporaryUploadedFile.from_file(
File(open(temp_file.name, "rb"))
)
if uploaded_file.hashers["sha256"].hexdigest() != digest[len("sha256:") :]:
upload.delete()
raise Exception("The digest did not match")
try:
artifact = Artifact.init_and_validate(uploaded_file)
artifact.save()
except IntegrityError:
artifact = Artifact.objects.get(sha256=artifact.sha256)
artifact.touch()

with transaction.atomic():
try:
blob = models.Blob(digest=digest)
blob.save()
Expand All @@ -715,23 +746,21 @@ def put(self, request, path, pk=None):
if not ca.artifact:
ca.artifact = artifact
ca.save(update_fields=["artifact"])
upload.delete()

dispatched_task = dispatch(
add_and_remove,
shared_resources=[f"upload:{pk}"],
exclusive_resources=[repository],
kwargs={
"repository_pk": str(repository.pk),
"add_content_units": [str(blob.pk)],
"remove_content_units": [],
},
)
upload.delete()

if has_task_completed(dispatched_task):
return BlobResponse(blob, path, 201, request)
else:
raise Exception("The digest did not match")
dispatched_task = dispatch(
add_and_remove,
shared_resources=[f"upload:{pk}"],
exclusive_resources=[repository],
kwargs={
"repository_pk": str(repository.pk),
"add_content_units": [str(blob.pk)],
"remove_content_units": [],
},
)

if has_task_completed(dispatched_task):
return BlobResponse(blob, path, 201, request)


class RedirectsMixin:
Expand Down

0 comments on commit 3ca9027

Please sign in to comment.