Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize ACS stage #4274

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 32 additions & 42 deletions pulpcore/plugin/stages/artifact_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@

from aiofiles import os as aos
from asgiref.sync import sync_to_async
from django.db.models import Prefetch, prefetch_related_objects, Q
from django.db.models import Prefetch, prefetch_related_objects

from pulpcore.plugin.exceptions import UnsupportedDigestValidationError
from pulpcore.plugin.models import (
AlternateContentSource,
Artifact,
ContentArtifact,
ProgressReport,
Expand Down Expand Up @@ -472,52 +471,43 @@ class ACSArtifactHandler(Stage):

async def run(self):
async for batch in self.batches():
acs_query = AlternateContentSource.objects.filter(pulp_domain=self.domain)
acs_exists = await acs_query.aexists()
if acs_exists:
# Gather batch d_artifact checksums
batch_checksums = defaultdict(list)
for d_content in batch:
for d_artifact in d_content.d_artifacts:
for cks_type in d_artifact.artifact.COMMON_DIGEST_FIELDS:
if getattr(d_artifact.artifact, cks_type):
batch_checksums[cks_type].append(
getattr(d_artifact.artifact, cks_type)
)

batch_query = Q()
for checksum_type in batch_checksums.keys():
batch_query.add(
Q(**{f"{checksum_type}__in": batch_checksums[checksum_type]}), Q.OR
)
# Gather batch d_artifact checksums
batch_checksums = defaultdict(list)
for d_content in batch:
for d_artifact in d_content.d_artifacts:
for cks_type in d_artifact.artifact.COMMON_DIGEST_FIELDS:
if getattr(d_artifact.artifact, cks_type):
batch_checksums[cks_type].append(getattr(d_artifact.artifact, cks_type))

existing_ras_dict = dict()
for checksum_type in batch_checksums.keys():
existing_ras = (
RemoteArtifact.objects.acs()
.filter(batch_query)
.only("url", "remote")
.filter(**{f"{checksum_type}__in": batch_checksums[checksum_type]})
.only("url", checksum_type, "remote")
.select_related("remote")
)
existing_ras_dict = dict()
async for ra in existing_ras:
for c_type in Artifact.COMMON_DIGEST_FIELDS:
checksum = await sync_to_async(getattr)(ra, c_type)
# pick the first occurence of RA from ACS
if checksum and checksum not in existing_ras_dict:
existing_ras_dict[checksum] = {
"remote": ra.remote,
"url": ra.url,
}
# todo: we could probably get rid of this select_related by separating
# out the remote query
async for ra in existing_ras.aiterator():
checksum = getattr(ra, checksum_type)
# pick the first occurence of RA from ACS
if checksum not in existing_ras_dict:
existing_ras_dict[checksum] = {
"remote": ra.remote,
"url": ra.url,
}

for d_content in batch:
for d_artifact in d_content.d_artifacts:
for checksum_type in Artifact.COMMON_DIGEST_FIELDS:
if getattr(d_artifact.artifact, checksum_type):
checksum = getattr(d_artifact.artifact, checksum_type)
if checksum in existing_ras_dict:
d_artifact.urls = [
existing_ras_dict[checksum]["url"]
] + d_artifact.urls
d_artifact.remote = existing_ras_dict[checksum]["remote"]
for d_content in batch:
for d_artifact in d_content.d_artifacts:
for checksum_type in Artifact.COMMON_DIGEST_FIELDS:
if getattr(d_artifact.artifact, checksum_type):
checksum = getattr(d_artifact.artifact, checksum_type)
if checksum in existing_ras_dict:
d_artifact.urls = [
Copy link
Member

@ipanova ipanova Aug 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Binging back for transparency some of the matrix conversation @dralley and I had.
I see few issue with the current design:
1)The original intention for this line was the declarative artifact to accept list of URLs which downloader will try in a loop to go trough. Those URLs would come from a mirrorlist and the remote would whether not need credentials at all or these credentials would be same for every URLs in that list. Not sure how worthy it is for the ACS stage to have a list of URLs - this would be a mix of them, one coming from original remote, most likely needing credentials and another from ACS( with or without credentials or different credentials?). The remote which is going to be used is the one from ACS.
So maybe this needs to be reverted back to d_artifact.url = existing_ras_dict[checksum]["url"]
2) Another problem is that in case the ArtifactDownload stage fails( namely download of binary data from the substituted ACS source) there is no fallback to the original remote downloader + url.
It is expected in normal circumstances the source for ACS being available because that's the preferred and fastest way how to get the binary data but no one can guarantee normal circumstances 100%.
3) We should first iterate through all available ACSes to download from and then fallback to the original remote but instead we provide just one ( this is currently pipeline limitation, where declarative artifact can reference just one remote).

existing_ras_dict[checksum]["url"]
] + d_artifact.urls
d_artifact.remote = existing_ras_dict[checksum]["remote"]
Comment on lines +507 to +510
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading through the code I am mega-confused on how ACS even works and reading the workflow docs did not clear it up for me. https://docs.pulpproject.org/pulpcore/workflows/alternate-content-sources.html

I think there is a big problem with just overwritting the d_artifact.remote field with the one from an existing RA that could be from a different remote! It means all the current urls on the d_artifact could be rendered useless since they won't have the correct remote to build the downloader from. I have a feeling this feature is not working entirely how we intend it to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this isn't my strong suit either. If I understand correctly, the basic idea is to preferentially use a different remote instead of the original one for downloads. I believe what might make it OK, is that when it comes to saving remote artifacts, it's ultimately going to end up saving new remote artifacts rather than modifying existing ones. But in terms of the download process, it will be the new ones that used?

I think! It would be worthwhile to verify those assumptions and document the actual details while I'm working on this code anyway.

Copy link
Contributor

@ggainey ggainey Aug 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a lot of discussion in the original plan.io issue : https://pulp.plan.io/issues/7832

And some questions/answers in the hackmd : https://hackmd.io/@pulp/acs


for d_content in batch:
await self.put(d_content)
10 changes: 6 additions & 4 deletions pulpcore/plugin/stages/declarative_version.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
import asyncio
import tempfile

from .api import create_pipeline, EndStage
from .artifact_stages import (
from pulpcore.plugin.models import AlternateContentSource
from pulpcore.plugin.stages.api import create_pipeline, EndStage
from pulpcore.plugin.stages.artifact_stages import (
ACSArtifactHandler,
ArtifactDownloader,
ArtifactSaver,
QueryExistingArtifacts,
RemoteArtifactSaver,
)
from .content_stages import (
from pulpcore.plugin.stages.content_stages import (
ContentAssociation,
ContentSaver,
QueryExistingContents,
ResolveContentFutures,
)
from pulpcore.plugin.util import get_domain_pk


class DeclarativeVersion:
Expand Down Expand Up @@ -131,7 +133,7 @@ def pipeline_stages(self, new_version):
self.first_stage,
QueryExistingArtifacts(),
]
if self.acs:
if self.acs and AlternateContentSource.objects.filter(pulp_domain=get_domain_pk()).exists():
pipeline.append(ACSArtifactHandler())
pipeline.extend(
[
Expand Down