Skip to content

Commit

Permalink
Sign manifests asynchronously
Browse files Browse the repository at this point in the history
closes pulp#1208
  • Loading branch information
lubosmj committed Oct 31, 2023
1 parent efe2772 commit 46076ba
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGES/1208.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Started signing manifests asynchronously. This feature improves the performance of signing tasks.
75 changes: 48 additions & 27 deletions pulp_container/app/tasks/sign.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import base64
import hashlib
import os
import tempfile

from aiofiles import tempfile
from asgiref.sync import sync_to_async
from django.conf import settings

from pulpcore.plugin.models import Repository
Expand All @@ -19,6 +20,10 @@
SIGNATURE_TYPE,
)

SIGNING_TASKS_COUNTER = 10

semaphore = asyncio.Semaphore(SIGNING_TASKS_COUNTER)


def sign(repository_pk, signing_service_pk, reference, tags_list=None):
"""
Expand All @@ -27,8 +32,8 @@ def sign(repository_pk, signing_service_pk, reference, tags_list=None):
Create signature for each manifest that is specified and add it to the repo.
If no manifests were specified, then sign all manifests in the repo.
What manifests to sign is identified by tag.
Manifest lists are not signed. Image manifests from manifest list are signed by digest.
What manifests to sign is identified by tags.
Manifest lists are signed too. Image manifests from the manifest lists are signed by tags.
Args:
repository_pk (uuid): A pk for a Repository for which a new Repository Version should be
Expand All @@ -47,26 +52,39 @@ def sign(repository_pk, signing_service_pk, reference, tags_list=None):
)
else:
latest_repo_content_tags = latest_version.content.filter(pulp_type=Tag.get_pulp_type())
latest_repo_tags = Tag.objects.filter(pk__in=latest_repo_content_tags)
latest_repo_tags = Tag.objects.filter(pk__in=latest_repo_content_tags).select_related(
"tagged_manifest"
)
signing_service = ManifestSigningService.objects.get(pk=signing_service_pk)
added_signatures = []
for tag in latest_repo_tags:
tagged_manifest = tag.tagged_manifest
docker_reference = ":".join((reference, tag.name))
signature_pk = create_signature(tagged_manifest, docker_reference, signing_service)
added_signatures.append(signature_pk)
if tagged_manifest.media_type in MANIFEST_MEDIA_TYPES.LIST:
# parse ML and sign per-arches
for manifest in tagged_manifest.listed_manifests.iterator():
signature_pk = create_signature(manifest, docker_reference, signing_service)
added_signatures.append(signature_pk)

async def sign_manifests():
added_signatures = []

async for tag in latest_repo_tags.aiterator():
tagged_manifest = tag.tagged_manifest
docker_reference = ":".join((reference, tag.name))
signature_pk = await create_signature(
tagged_manifest, docker_reference, signing_service
)
added_signatures.append(signature_pk)
if tagged_manifest.media_type in MANIFEST_MEDIA_TYPES.LIST:
# parse ML and sign per-arches
manifests_iterator = tagged_manifest.listed_manifests.aiterator()
async for manifest in manifests_iterator:
signature_pk = await create_signature(
manifest, docker_reference, signing_service
)
added_signatures.append(signature_pk)

return added_signatures

added_signatures = asyncio.run(sign_manifests())
added_signatures_qs = ManifestSignature.objects.filter(pk__in=added_signatures)
with repository.new_version() as new_version:
new_version.add_content(added_signatures_qs)


def create_signature(manifest, reference, signing_service):
async def create_signature(manifest, reference, signing_service):
"""
Create manifest signature.
Expand All @@ -81,20 +99,23 @@ def create_signature(manifest, reference, signing_service):
pk of created ManifestSignature.
"""
with tempfile.TemporaryDirectory(dir=".") as working_directory:
async with semaphore:
# download and write file for object storage
artifact = await manifest._artifacts.aget()
if settings.DEFAULT_FILE_STORAGE != "pulpcore.app.models.storage.FileSystem":
manifest_file = tempfile.NamedTemporaryFile(dir=working_directory, delete=False)
artifact = manifest._artifacts.get()
manifest_file.write(artifact.file.read())
manifest_file.flush()
async with tempfile.NamedTemporaryFile(dir=".", mode="wb", delete=False) as tf:
await tf.write(await sync_to_async(artifact.file.read)())
await tf.flush()

artifact.file.close()
manifest_path = manifest_file.name
manifest_path = tf.name
else:
manifest_path = manifest._artifacts.get().file.path
sig_path = os.path.join(working_directory, "signature")
manifest_path = artifact.file.path

async with tempfile.NamedTemporaryFile(dir=".", prefix="signature") as tf:
sig_path = tf.name

signed = signing_service.sign(
signed = await signing_service.asign(
manifest_path, env_vars={"REFERENCE": reference, "SIG_PATH": sig_path}
)

Expand All @@ -115,6 +136,6 @@ def create_signature(manifest, reference, signing_service):
data=encoded_sig,
signed_manifest=manifest,
)
signature.save()
await signature.asave()

return signature.pk

0 comments on commit 46076ba

Please sign in to comment.