Skip to content

Commit

Permalink
Sign manifests asynchronously
Browse files Browse the repository at this point in the history
closes pulp#1208
  • Loading branch information
lubosmj committed May 17, 2023
1 parent 15aca2f commit ef0872c
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGES/1208.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Started signing manifests asynchronously. This feature improves the performance of signing tasks.
72 changes: 47 additions & 25 deletions pulp_container/app/tasks/sign.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import asyncio
import base64
import hashlib
import os
import tempfile

from aiofiles import tempfile
from django.conf import settings

from pulpcore.plugin.models import Repository
from pulpcore.plugin.sync import sync_to_async

from pulp_container.app.models import (
ManifestSignature,
Expand All @@ -19,6 +20,10 @@
SIGNATURE_TYPE,
)

SIGNING_TASKS_COUNTER = 10

semaphore = asyncio.Semaphore(SIGNING_TASKS_COUNTER)


def sign(repository_pk, signing_service_pk, reference, tags_list=None):
"""
Expand Down Expand Up @@ -47,26 +52,40 @@ def sign(repository_pk, signing_service_pk, reference, tags_list=None):
)
else:
latest_repo_content_tags = latest_version.content.filter(pulp_type=Tag.get_pulp_type())
latest_repo_tags = Tag.objects.filter(pk__in=latest_repo_content_tags)
latest_repo_tags = Tag.objects.filter(pk__in=latest_repo_content_tags).select_related(
"tagged_manifest"
)
signing_service = ManifestSigningService.objects.get(pk=signing_service_pk)
added_signatures = []
for tag in latest_repo_tags:
tagged_manifest = tag.tagged_manifest
docker_reference = ":".join((reference, tag.name))
signature_pk = create_signature(tagged_manifest, docker_reference, signing_service)
added_signatures.append(signature_pk)
if tagged_manifest.media_type in MANIFEST_MEDIA_TYPES.LIST:
# parse ML and sign per-arches
for manifest in tagged_manifest.listed_manifests.iterator():
signature_pk = create_signature(manifest, docker_reference, signing_service)
added_signatures.append(signature_pk)

async def sign_manifests():
added_signatures = []

async for tag in latest_repo_tags.aiterator():
tagged_manifest = tag.tagged_manifest
docker_reference = ":".join((reference, tag.name))
signature_pk = await create_signature(
tagged_manifest, docker_reference, signing_service
)
added_signatures.append(signature_pk)
if tagged_manifest.media_type in MANIFEST_MEDIA_TYPES.LIST:
# parse ML and sign per-arches
added_signatures = []
manifests_iterator = tagged_manifest.listed_manifests.aiterator()
async for manifest in manifests_iterator:
signature_pk = await create_signature(
manifest, docker_reference, signing_service
)
added_signatures.append(signature_pk)

return added_signatures

added_signatures = asyncio.run(sign_manifests())
added_signatures_qs = ManifestSignature.objects.filter(pk__in=added_signatures)
with repository.new_version() as new_version:
new_version.add_content(added_signatures_qs)


def create_signature(manifest, reference, signing_service):
async def create_signature(manifest, reference, signing_service):
"""
Create manifest signature.
Expand All @@ -81,20 +100,23 @@ def create_signature(manifest, reference, signing_service):
pk of created ManifestSignature.
"""
with tempfile.TemporaryDirectory(dir=".") as working_directory:
async with semaphore:
# download and write file for object storage
artifact = await manifest._artifacts.aget()
if settings.DEFAULT_FILE_STORAGE != "pulpcore.app.models.storage.FileSystem":
manifest_file = tempfile.NamedTemporaryFile(dir=working_directory, delete=False)
artifact = manifest._artifacts.get()
manifest_file.write(artifact.file.read())
manifest_file.flush()
async with tempfile.NamedTemporaryFile(dir=".", mode="wb", delete=False) as tf:
await tf.write(await sync_to_async(artifact.file.read)())
await tf.flush()

artifact.file.close()
manifest_path = manifest_file.name
manifest_path = tf.name
else:
manifest_path = manifest._artifacts.get().file.path
sig_path = os.path.join(working_directory, "signature")
manifest_path = artifact.file.path

async with tempfile.NamedTemporaryFile(dir=".", prefix="signature") as tf:
sig_path = tf.name

signed = signing_service.sign(
signed = await signing_service.asign(
manifest_path, env_vars={"REFERENCE": reference, "SIG_PATH": sig_path}
)

Expand All @@ -115,6 +137,6 @@ def create_signature(manifest, reference, signing_service):
data=encoded_sig,
signed_manifest=manifest,
)
signature.save()
await signature.asave()

return signature.pk

0 comments on commit ef0872c

Please sign in to comment.