From 58672589d559ee098148cffdf60524cf2d30250e Mon Sep 17 00:00:00 2001 From: Daniel Alley Date: Sun, 13 Aug 2023 23:37:29 -0400 Subject: [PATCH 1/3] Optimize ACS sync stage Lift some queries that were performed per-batch outside of the loop, resolving an N+1 where N=number of batches. Also use .iterator() [noissue] --- pulpcore/plugin/stages/artifact_stages.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/pulpcore/plugin/stages/artifact_stages.py b/pulpcore/plugin/stages/artifact_stages.py index 649a859371..38877b0a8e 100644 --- a/pulpcore/plugin/stages/artifact_stages.py +++ b/pulpcore/plugin/stages/artifact_stages.py @@ -471,9 +471,9 @@ class ACSArtifactHandler(Stage): """ async def run(self): + acs_query = AlternateContentSource.objects.filter(pulp_domain=self.domain) + acs_exists = await acs_query.aexists() async for batch in self.batches(): - acs_query = AlternateContentSource.objects.filter(pulp_domain=self.domain) - acs_exists = await acs_query.aexists() if acs_exists: # Gather batch d_artifact checksums batch_checksums = defaultdict(list) @@ -492,15 +492,12 @@ async def run(self): ) existing_ras = ( - RemoteArtifact.objects.acs() - .filter(batch_query) - .only("url", "remote") - .select_related("remote") + RemoteArtifact.objects.acs().filter(batch_query).select_related("remote") ) existing_ras_dict = dict() - async for ra in existing_ras: + async for ra in existing_ras.aiterator(): for c_type in Artifact.COMMON_DIGEST_FIELDS: - checksum = await sync_to_async(getattr)(ra, c_type) + checksum = getattr(ra, c_type) # todo: this used to be async-protected # pick the first occurence of RA from ACS if checksum and checksum not in existing_ras_dict: existing_ras_dict[checksum] = { From 7d24ec4ab6efeb8b58daddbf20d7d8a6e6f10880 Mon Sep 17 00:00:00 2001 From: Daniel Alley Date: Sun, 13 Aug 2023 23:38:36 -0400 Subject: [PATCH 2/3] Optimize ACS sync stage Instead of performing a gigantic AND-ed OR clause query, break up the list of remote artifacts by checksum type and perform one IN query per type of checksum, which ought to be easily indexable. [noissue] --- pulpcore/plugin/stages/artifact_stages.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/pulpcore/plugin/stages/artifact_stages.py b/pulpcore/plugin/stages/artifact_stages.py index 38877b0a8e..fb9e2725f6 100644 --- a/pulpcore/plugin/stages/artifact_stages.py +++ b/pulpcore/plugin/stages/artifact_stages.py @@ -5,7 +5,7 @@ from aiofiles import os as aos from asgiref.sync import sync_to_async -from django.db.models import Prefetch, prefetch_related_objects, Q +from django.db.models import Prefetch, prefetch_related_objects from pulpcore.plugin.exceptions import UnsupportedDigestValidationError from pulpcore.plugin.models import ( @@ -485,21 +485,18 @@ async def run(self): getattr(d_artifact.artifact, cks_type) ) - batch_query = Q() + existing_ras_dict = dict() for checksum_type in batch_checksums.keys(): - batch_query.add( - Q(**{f"{checksum_type}__in": batch_checksums[checksum_type]}), Q.OR + existing_ras = ( + RemoteArtifact.objects.acs() + .filter(**{f"{checksum_type}__in": batch_checksums[checksum_type]}) + .only("url", checksum_type, "remote") + .select_related("remote") ) - - existing_ras = ( - RemoteArtifact.objects.acs().filter(batch_query).select_related("remote") - ) - existing_ras_dict = dict() - async for ra in existing_ras.aiterator(): - for c_type in Artifact.COMMON_DIGEST_FIELDS: - checksum = getattr(ra, c_type) # todo: this used to be async-protected + async for ra in existing_ras.aiterator(): + checksum = getattr(ra, checksum_type) # pick the first occurence of RA from ACS - if checksum and checksum not in existing_ras_dict: + if checksum not in existing_ras_dict: existing_ras_dict[checksum] = { "remote": ra.remote, "url": ra.url, From 516114fd7be97deb4387f91e4a4f28126605d022 Mon Sep 17 00:00:00 2001 From: Daniel Alley Date: Mon, 14 Aug 2023 13:43:14 -0400 Subject: [PATCH 3/3] Optimize ACS sync stage Lift the ACS domain check out of the stage entirely. [noissue] --- pulpcore/plugin/stages/artifact_stages.py | 76 +++++++++---------- pulpcore/plugin/stages/declarative_version.py | 10 ++- 2 files changed, 42 insertions(+), 44 deletions(-) diff --git a/pulpcore/plugin/stages/artifact_stages.py b/pulpcore/plugin/stages/artifact_stages.py index fb9e2725f6..c0cb70a268 100644 --- a/pulpcore/plugin/stages/artifact_stages.py +++ b/pulpcore/plugin/stages/artifact_stages.py @@ -9,7 +9,6 @@ from pulpcore.plugin.exceptions import UnsupportedDigestValidationError from pulpcore.plugin.models import ( - AlternateContentSource, Artifact, ContentArtifact, ProgressReport, @@ -471,47 +470,44 @@ class ACSArtifactHandler(Stage): """ async def run(self): - acs_query = AlternateContentSource.objects.filter(pulp_domain=self.domain) - acs_exists = await acs_query.aexists() async for batch in self.batches(): - if acs_exists: - # Gather batch d_artifact checksums - batch_checksums = defaultdict(list) - for d_content in batch: - for d_artifact in d_content.d_artifacts: - for cks_type in d_artifact.artifact.COMMON_DIGEST_FIELDS: - if getattr(d_artifact.artifact, cks_type): - batch_checksums[cks_type].append( - getattr(d_artifact.artifact, cks_type) - ) - - existing_ras_dict = dict() - for checksum_type in batch_checksums.keys(): - existing_ras = ( - RemoteArtifact.objects.acs() - .filter(**{f"{checksum_type}__in": batch_checksums[checksum_type]}) - .only("url", checksum_type, "remote") - .select_related("remote") - ) - async for ra in existing_ras.aiterator(): - checksum = getattr(ra, checksum_type) - # pick the first occurence of RA from ACS - if checksum not in existing_ras_dict: - existing_ras_dict[checksum] = { - "remote": ra.remote, - "url": ra.url, - } + # Gather batch d_artifact checksums + batch_checksums = defaultdict(list) + for d_content in batch: + for d_artifact in d_content.d_artifacts: + for cks_type in d_artifact.artifact.COMMON_DIGEST_FIELDS: + if getattr(d_artifact.artifact, cks_type): + batch_checksums[cks_type].append(getattr(d_artifact.artifact, cks_type)) + + existing_ras_dict = dict() + for checksum_type in batch_checksums.keys(): + existing_ras = ( + RemoteArtifact.objects.acs() + .filter(**{f"{checksum_type}__in": batch_checksums[checksum_type]}) + .only("url", checksum_type, "remote") + .select_related("remote") + ) + # todo: we could probably get rid of this select_related by separating + # out the remote query + async for ra in existing_ras.aiterator(): + checksum = getattr(ra, checksum_type) + # pick the first occurence of RA from ACS + if checksum not in existing_ras_dict: + existing_ras_dict[checksum] = { + "remote": ra.remote, + "url": ra.url, + } - for d_content in batch: - for d_artifact in d_content.d_artifacts: - for checksum_type in Artifact.COMMON_DIGEST_FIELDS: - if getattr(d_artifact.artifact, checksum_type): - checksum = getattr(d_artifact.artifact, checksum_type) - if checksum in existing_ras_dict: - d_artifact.urls = [ - existing_ras_dict[checksum]["url"] - ] + d_artifact.urls - d_artifact.remote = existing_ras_dict[checksum]["remote"] + for d_content in batch: + for d_artifact in d_content.d_artifacts: + for checksum_type in Artifact.COMMON_DIGEST_FIELDS: + if getattr(d_artifact.artifact, checksum_type): + checksum = getattr(d_artifact.artifact, checksum_type) + if checksum in existing_ras_dict: + d_artifact.urls = [ + existing_ras_dict[checksum]["url"] + ] + d_artifact.urls + d_artifact.remote = existing_ras_dict[checksum]["remote"] for d_content in batch: await self.put(d_content) diff --git a/pulpcore/plugin/stages/declarative_version.py b/pulpcore/plugin/stages/declarative_version.py index 0c1a40a745..df6bc1ec01 100644 --- a/pulpcore/plugin/stages/declarative_version.py +++ b/pulpcore/plugin/stages/declarative_version.py @@ -1,20 +1,22 @@ import asyncio import tempfile -from .api import create_pipeline, EndStage -from .artifact_stages import ( +from pulpcore.plugin.models import AlternateContentSource +from pulpcore.plugin.stages.api import create_pipeline, EndStage +from pulpcore.plugin.stages.artifact_stages import ( ACSArtifactHandler, ArtifactDownloader, ArtifactSaver, QueryExistingArtifacts, RemoteArtifactSaver, ) -from .content_stages import ( +from pulpcore.plugin.stages.content_stages import ( ContentAssociation, ContentSaver, QueryExistingContents, ResolveContentFutures, ) +from pulpcore.plugin.util import get_domain_pk class DeclarativeVersion: @@ -131,7 +133,7 @@ def pipeline_stages(self, new_version): self.first_stage, QueryExistingArtifacts(), ] - if self.acs: + if self.acs and AlternateContentSource.objects.filter(pulp_domain=get_domain_pk()).exists(): pipeline.append(ACSArtifactHandler()) pipeline.extend( [