Skip to content

Commit

Permalink
Optimize ACS sync stage
Browse files Browse the repository at this point in the history
Lift the ACS domain check out of the stage entirely.

[noissue]
  • Loading branch information
dralley committed Aug 13, 2024
1 parent 7d24ec4 commit 9305eb8
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 44 deletions.
75 changes: 35 additions & 40 deletions pulpcore/plugin/stages/artifact_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

from pulpcore.plugin.exceptions import UnsupportedDigestValidationError
from pulpcore.plugin.models import (
AlternateContentSource,
Artifact,
ContentArtifact,
ProgressReport,
Expand Down Expand Up @@ -471,47 +470,43 @@ class ACSArtifactHandler(Stage):
"""

async def run(self):
acs_query = AlternateContentSource.objects.filter(pulp_domain=self.domain)
acs_exists = await acs_query.aexists()
async for batch in self.batches():
if acs_exists:
# Gather batch d_artifact checksums
batch_checksums = defaultdict(list)
for d_content in batch:
for d_artifact in d_content.d_artifacts:
for cks_type in d_artifact.artifact.COMMON_DIGEST_FIELDS:
if getattr(d_artifact.artifact, cks_type):
batch_checksums[cks_type].append(
getattr(d_artifact.artifact, cks_type)
)

existing_ras_dict = dict()
for checksum_type in batch_checksums.keys():
existing_ras = (
RemoteArtifact.objects.acs()
.filter(**{f"{checksum_type}__in": batch_checksums[checksum_type]})
.only("url", checksum_type, "remote")
.select_related("remote")
)
async for ra in existing_ras.aiterator():
checksum = getattr(ra, checksum_type)
# pick the first occurence of RA from ACS
if checksum not in existing_ras_dict:
existing_ras_dict[checksum] = {
"remote": ra.remote,
"url": ra.url,
}
# Gather batch d_artifact checksums
batch_checksums = defaultdict(list)
for d_content in batch:
for d_artifact in d_content.d_artifacts:
for cks_type in d_artifact.artifact.COMMON_DIGEST_FIELDS:
if getattr(d_artifact.artifact, cks_type):
batch_checksums[cks_type].append(getattr(d_artifact.artifact, cks_type))

existing_ras_dict = dict()
for checksum_type in batch_checksums.keys():
existing_ras = (
RemoteArtifact.objects.acs()
.filter(**{f"{checksum_type}__in": batch_checksums[checksum_type]})
.only("url", checksum_type, "remote")
.select_related("remote")
)
# todo: we could probably get rid of this select_related by separating out the remote query
async for ra in existing_ras.aiterator():
checksum = getattr(ra, checksum_type)
# pick the first occurence of RA from ACS
if checksum not in existing_ras_dict:
existing_ras_dict[checksum] = {
"remote": ra.remote,
"url": ra.url,
}

for d_content in batch:
for d_artifact in d_content.d_artifacts:
for checksum_type in Artifact.COMMON_DIGEST_FIELDS:
if getattr(d_artifact.artifact, checksum_type):
checksum = getattr(d_artifact.artifact, checksum_type)
if checksum in existing_ras_dict:
d_artifact.urls = [
existing_ras_dict[checksum]["url"]
] + d_artifact.urls
d_artifact.remote = existing_ras_dict[checksum]["remote"]
for d_content in batch:
for d_artifact in d_content.d_artifacts:
for checksum_type in Artifact.COMMON_DIGEST_FIELDS:
if getattr(d_artifact.artifact, checksum_type):
checksum = getattr(d_artifact.artifact, checksum_type)
if checksum in existing_ras_dict:
d_artifact.urls = [
existing_ras_dict[checksum]["url"]
] + d_artifact.urls
d_artifact.remote = existing_ras_dict[checksum]["remote"]

for d_content in batch:
await self.put(d_content)
10 changes: 6 additions & 4 deletions pulpcore/plugin/stages/declarative_version.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
import asyncio
import tempfile

from .api import create_pipeline, EndStage
from .artifact_stages import (
from pulpcore.plugin.models import AlternateContentSource
from pulpcore.plugin.stages.api import create_pipeline, EndStage
from pulpcore.plugin.stages.artifact_stages import (
ACSArtifactHandler,
ArtifactDownloader,
ArtifactSaver,
QueryExistingArtifacts,
RemoteArtifactSaver,
)
from .content_stages import (
from pulpcore.plugin.stages.content_stages import (
ContentAssociation,
ContentSaver,
QueryExistingContents,
ResolveContentFutures,
)
from pulpcore.plugin.util import get_domain_pk


class DeclarativeVersion:
Expand Down Expand Up @@ -131,7 +133,7 @@ def pipeline_stages(self, new_version):
self.first_stage,
QueryExistingArtifacts(),
]
if self.acs:
if self.acs and AlternateContentSource.objects.filter(pulp_domain=get_domain_pk()).exists():
pipeline.append(ACSArtifactHandler())
pipeline.extend(
[
Expand Down

0 comments on commit 9305eb8

Please sign in to comment.