diff --git a/pulpcore/plugin/stages/artifact_stages.py b/pulpcore/plugin/stages/artifact_stages.py index 649a859371..c0cb70a268 100644 --- a/pulpcore/plugin/stages/artifact_stages.py +++ b/pulpcore/plugin/stages/artifact_stages.py @@ -5,11 +5,10 @@ from aiofiles import os as aos from asgiref.sync import sync_to_async -from django.db.models import Prefetch, prefetch_related_objects, Q +from django.db.models import Prefetch, prefetch_related_objects from pulpcore.plugin.exceptions import UnsupportedDigestValidationError from pulpcore.plugin.models import ( - AlternateContentSource, Artifact, ContentArtifact, ProgressReport, @@ -472,52 +471,43 @@ class ACSArtifactHandler(Stage): async def run(self): async for batch in self.batches(): - acs_query = AlternateContentSource.objects.filter(pulp_domain=self.domain) - acs_exists = await acs_query.aexists() - if acs_exists: - # Gather batch d_artifact checksums - batch_checksums = defaultdict(list) - for d_content in batch: - for d_artifact in d_content.d_artifacts: - for cks_type in d_artifact.artifact.COMMON_DIGEST_FIELDS: - if getattr(d_artifact.artifact, cks_type): - batch_checksums[cks_type].append( - getattr(d_artifact.artifact, cks_type) - ) - - batch_query = Q() - for checksum_type in batch_checksums.keys(): - batch_query.add( - Q(**{f"{checksum_type}__in": batch_checksums[checksum_type]}), Q.OR - ) + # Gather batch d_artifact checksums + batch_checksums = defaultdict(list) + for d_content in batch: + for d_artifact in d_content.d_artifacts: + for cks_type in d_artifact.artifact.COMMON_DIGEST_FIELDS: + if getattr(d_artifact.artifact, cks_type): + batch_checksums[cks_type].append(getattr(d_artifact.artifact, cks_type)) + existing_ras_dict = dict() + for checksum_type in batch_checksums.keys(): existing_ras = ( RemoteArtifact.objects.acs() - .filter(batch_query) - .only("url", "remote") + .filter(**{f"{checksum_type}__in": batch_checksums[checksum_type]}) + .only("url", checksum_type, "remote") .select_related("remote") ) - existing_ras_dict = dict() - async for ra in existing_ras: - for c_type in Artifact.COMMON_DIGEST_FIELDS: - checksum = await sync_to_async(getattr)(ra, c_type) - # pick the first occurence of RA from ACS - if checksum and checksum not in existing_ras_dict: - existing_ras_dict[checksum] = { - "remote": ra.remote, - "url": ra.url, - } + # todo: we could probably get rid of this select_related by separating + # out the remote query + async for ra in existing_ras.aiterator(): + checksum = getattr(ra, checksum_type) + # pick the first occurence of RA from ACS + if checksum not in existing_ras_dict: + existing_ras_dict[checksum] = { + "remote": ra.remote, + "url": ra.url, + } - for d_content in batch: - for d_artifact in d_content.d_artifacts: - for checksum_type in Artifact.COMMON_DIGEST_FIELDS: - if getattr(d_artifact.artifact, checksum_type): - checksum = getattr(d_artifact.artifact, checksum_type) - if checksum in existing_ras_dict: - d_artifact.urls = [ - existing_ras_dict[checksum]["url"] - ] + d_artifact.urls - d_artifact.remote = existing_ras_dict[checksum]["remote"] + for d_content in batch: + for d_artifact in d_content.d_artifacts: + for checksum_type in Artifact.COMMON_DIGEST_FIELDS: + if getattr(d_artifact.artifact, checksum_type): + checksum = getattr(d_artifact.artifact, checksum_type) + if checksum in existing_ras_dict: + d_artifact.urls = [ + existing_ras_dict[checksum]["url"] + ] + d_artifact.urls + d_artifact.remote = existing_ras_dict[checksum]["remote"] for d_content in batch: await self.put(d_content) diff --git a/pulpcore/plugin/stages/declarative_version.py b/pulpcore/plugin/stages/declarative_version.py index 0c1a40a745..df6bc1ec01 100644 --- a/pulpcore/plugin/stages/declarative_version.py +++ b/pulpcore/plugin/stages/declarative_version.py @@ -1,20 +1,22 @@ import asyncio import tempfile -from .api import create_pipeline, EndStage -from .artifact_stages import ( +from pulpcore.plugin.models import AlternateContentSource +from pulpcore.plugin.stages.api import create_pipeline, EndStage +from pulpcore.plugin.stages.artifact_stages import ( ACSArtifactHandler, ArtifactDownloader, ArtifactSaver, QueryExistingArtifacts, RemoteArtifactSaver, ) -from .content_stages import ( +from pulpcore.plugin.stages.content_stages import ( ContentAssociation, ContentSaver, QueryExistingContents, ResolveContentFutures, ) +from pulpcore.plugin.util import get_domain_pk class DeclarativeVersion: @@ -131,7 +133,7 @@ def pipeline_stages(self, new_version): self.first_stage, QueryExistingArtifacts(), ] - if self.acs: + if self.acs and AlternateContentSource.objects.filter(pulp_domain=get_domain_pk()).exists(): pipeline.append(ACSArtifactHandler()) pipeline.extend( [