Skip to content

Commit

Permalink
temp3
Browse files Browse the repository at this point in the history
[noissue]
  • Loading branch information
dralley committed Aug 14, 2023
1 parent e9d81e5 commit 4b1930d
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 44 deletions.
74 changes: 34 additions & 40 deletions pulpcore/plugin/stages/artifact_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from pulpcore.plugin.exceptions import UnsupportedDigestValidationError
from pulpcore.plugin.models import (
AlternateContentSource,
Artifact,
ContentArtifact,
ProgressReport,
Expand Down Expand Up @@ -465,47 +464,42 @@ class ACSArtifactHandler(Stage):
"""

async def run(self):
acs_query = AlternateContentSource.objects.filter(pulp_domain=self.domain)
acs_exists = await acs_query.aexists()
async for batch in self.batches():
if acs_exists:
# Gather batch d_artifact checksums
batch_checksums = defaultdict(list)
for d_content in batch:
for d_artifact in d_content.d_artifacts:
for cks_type in d_artifact.artifact.COMMON_DIGEST_FIELDS:
if getattr(d_artifact.artifact, cks_type):
batch_checksums[cks_type].append(
getattr(d_artifact.artifact, cks_type)
)

existing_ras_dict = dict()
for checksum_type in batch_checksums.keys():
existing_ras = (
RemoteArtifact.objects.acs()
.filter(**{f"{checksum_type}__in": batch_checksums[checksum_type]})
.only("url", checksum_type, "remote")
.select_related("remote")
)
async for ra in existing_ras.aiterator():
checksum = getattr(ra, checksum_type)
# pick the first occurence of RA from ACS
if checksum not in existing_ras_dict:
existing_ras_dict[checksum] = {
"remote": ra.remote,
"url": ra.url,
}
# Gather batch d_artifact checksums
batch_checksums = defaultdict(list)
for d_content in batch:
for d_artifact in d_content.d_artifacts:
for cks_type in d_artifact.artifact.COMMON_DIGEST_FIELDS:
if getattr(d_artifact.artifact, cks_type):
batch_checksums[cks_type].append(getattr(d_artifact.artifact, cks_type))

existing_ras_dict = dict()
for checksum_type in batch_checksums.keys():
existing_ras = (
RemoteArtifact.objects.acs()
.filter(**{f"{checksum_type}__in": batch_checksums[checksum_type]})
.only("url", checksum_type, "remote")
.select_related("remote")
)
async for ra in existing_ras.aiterator():
checksum = getattr(ra, checksum_type)
# pick the first occurence of RA from ACS
if checksum not in existing_ras_dict:
existing_ras_dict[checksum] = {
"remote": ra.remote,
"url": ra.url,
}

for d_content in batch:
for d_artifact in d_content.d_artifacts:
for checksum_type in Artifact.COMMON_DIGEST_FIELDS:
if getattr(d_artifact.artifact, checksum_type):
checksum = getattr(d_artifact.artifact, checksum_type)
if checksum in existing_ras_dict:
d_artifact.urls = [
existing_ras_dict[checksum]["url"]
] + d_artifact.urls
d_artifact.remote = existing_ras_dict[checksum]["remote"]
for d_content in batch:
for d_artifact in d_content.d_artifacts:
for checksum_type in Artifact.COMMON_DIGEST_FIELDS:
if getattr(d_artifact.artifact, checksum_type):
checksum = getattr(d_artifact.artifact, checksum_type)
if checksum in existing_ras_dict:
d_artifact.urls = [
existing_ras_dict[checksum]["url"]
] + d_artifact.urls
d_artifact.remote = existing_ras_dict[checksum]["remote"]

for d_content in batch:
await self.put(d_content)
9 changes: 5 additions & 4 deletions pulpcore/plugin/stages/declarative_version.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import asyncio
import tempfile

from .api import create_pipeline, EndStage
from .artifact_stages import (
from pulpcore.plugin.models import AlternateContentSource
from pulpcore.plugin.stages.api import create_pipeline, EndStage
from pulpcore.plugin.stages.artifact_stages import (
ACSArtifactHandler,
ArtifactDownloader,
ArtifactSaver,
QueryExistingArtifacts,
RemoteArtifactSaver,
)
from .content_stages import (
from pulpcore.plugin.stages.content_stages import (
ContentAssociation,
ContentSaver,
QueryExistingContents,
Expand Down Expand Up @@ -131,7 +132,7 @@ def pipeline_stages(self, new_version):
self.first_stage,
QueryExistingArtifacts(),
]
if self.acs:
if self.acs and AlternateContentSource.objects.filter(pulp_domain=self.domain).exists():
pipeline.append(ACSArtifactHandler())
pipeline.extend(
[
Expand Down

0 comments on commit 4b1930d

Please sign in to comment.