Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sign manifests asynchronously #1265

Merged
merged 1 commit into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES/1208.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Started signing manifests asynchronously. This feature improves the performance of signing tasks.
Additionally, setting ``MAX_PARALLEL_SIGNING_TASKS`` was introduced to cap the number of threads
used for parallel signing (defaults to ``10``).
3 changes: 3 additions & 0 deletions pulp_container/app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@
}

FLATPAK_INDEX = False

# The number of allowed threads to sign manifests in parallel
MAX_PARALLEL_SIGNING_TASKS = 10
73 changes: 46 additions & 27 deletions pulp_container/app/tasks/sign.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import base64
import hashlib
import os
import tempfile

from aiofiles import tempfile
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh... That name is a bit deceiving. But ok.

from asgiref.sync import sync_to_async
from django.conf import settings

from pulpcore.plugin.models import Repository
Expand All @@ -19,6 +20,8 @@
SIGNATURE_TYPE,
)

semaphore = asyncio.Semaphore(settings.MAX_PARALLEL_SIGNING_TASKS)


def sign(repository_pk, signing_service_pk, reference, tags_list=None):
"""
Expand All @@ -27,8 +30,8 @@ def sign(repository_pk, signing_service_pk, reference, tags_list=None):
Create signature for each manifest that is specified and add it to the repo.
If no manifests were specified, then sign all manifests in the repo.

What manifests to sign is identified by tag.
Manifest lists are not signed. Image manifests from manifest list are signed by digest.
What manifests to sign is identified by tags.
Manifest lists are signed too. Image manifests from the manifest lists are signed by tags.

Args:
repository_pk (uuid): A pk for a Repository for which a new Repository Version should be
Expand All @@ -47,26 +50,39 @@ def sign(repository_pk, signing_service_pk, reference, tags_list=None):
)
else:
latest_repo_content_tags = latest_version.content.filter(pulp_type=Tag.get_pulp_type())
latest_repo_tags = Tag.objects.filter(pk__in=latest_repo_content_tags)
latest_repo_tags = Tag.objects.filter(pk__in=latest_repo_content_tags).select_related(
"tagged_manifest"
)
signing_service = ManifestSigningService.objects.get(pk=signing_service_pk)
added_signatures = []
for tag in latest_repo_tags:
tagged_manifest = tag.tagged_manifest
docker_reference = ":".join((reference, tag.name))
signature_pk = create_signature(tagged_manifest, docker_reference, signing_service)
added_signatures.append(signature_pk)
if tagged_manifest.media_type in MANIFEST_MEDIA_TYPES.LIST:
# parse ML and sign per-arches
for manifest in tagged_manifest.listed_manifests.iterator():
signature_pk = create_signature(manifest, docker_reference, signing_service)
added_signatures.append(signature_pk)

async def sign_manifests():
added_signatures = []

async for tag in latest_repo_tags.aiterator():
tagged_manifest = tag.tagged_manifest
docker_reference = ":".join((reference, tag.name))
signature_pk = await create_signature(
tagged_manifest, docker_reference, signing_service
)
added_signatures.append(signature_pk)
if tagged_manifest.media_type in MANIFEST_MEDIA_TYPES.LIST:
# parse ML and sign per-arches
manifests_iterator = tagged_manifest.listed_manifests.aiterator()
async for manifest in manifests_iterator:
signature_pk = await create_signature(
manifest, docker_reference, signing_service
)
added_signatures.append(signature_pk)

return added_signatures

added_signatures = asyncio.run(sign_manifests())
added_signatures_qs = ManifestSignature.objects.filter(pk__in=added_signatures)
with repository.new_version() as new_version:
new_version.add_content(added_signatures_qs)


def create_signature(manifest, reference, signing_service):
async def create_signature(manifest, reference, signing_service):
"""
Create manifest signature.

Expand All @@ -81,20 +97,23 @@ def create_signature(manifest, reference, signing_service):
pk of created ManifestSignature.

"""
with tempfile.TemporaryDirectory(dir=".") as working_directory:
async with semaphore:
# download and write file for object storage
artifact = await manifest._artifacts.aget()
if settings.DEFAULT_FILE_STORAGE != "pulpcore.app.models.storage.FileSystem":
manifest_file = tempfile.NamedTemporaryFile(dir=working_directory, delete=False)
artifact = manifest._artifacts.get()
manifest_file.write(artifact.file.read())
manifest_file.flush()
async with tempfile.NamedTemporaryFile(dir=".", mode="wb", delete=False) as tf:
await tf.write(await sync_to_async(artifact.file.read)())
await tf.flush()

artifact.file.close()
manifest_path = manifest_file.name
manifest_path = tf.name
else:
manifest_path = manifest._artifacts.get().file.path
sig_path = os.path.join(working_directory, "signature")
manifest_path = artifact.file.path

async with tempfile.NamedTemporaryFile(dir=".", prefix="signature") as tf:
sig_path = tf.name

signed = signing_service.sign(
signed = await signing_service.asign(
manifest_path, env_vars={"REFERENCE": reference, "SIG_PATH": sig_path}
)

Expand All @@ -115,6 +134,6 @@ def create_signature(manifest, reference, signing_service):
data=encoded_sig,
signed_manifest=manifest,
)
signature.save()
await signature.asave()

return signature.pk
Loading