Skip to content

Commit

Permalink
Add pull-through caching
Browse files Browse the repository at this point in the history
closes #507
  • Loading branch information
lubosmj committed Jul 24, 2023
1 parent 8ee26e0 commit c50e570
Show file tree
Hide file tree
Showing 14 changed files with 720 additions and 99 deletions.
3 changes: 3 additions & 0 deletions CHANGES/507.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added support for pull-through caching. Users can now create a distribution with a remote pointing
to a remote registry without specifying the upstream name and Pulp automatically downloads missing
content and acts as a smart proxy.
39 changes: 39 additions & 0 deletions pulp_container/app/downloaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@
import asyncio
import json
import re
import tempfile

from aiohttp.client_exceptions import ClientResponseError
from logging import getLogger
from multidict import MultiDict
from urllib import parse

from django.conf import settings

from pulpcore.plugin.models import Artifact, Task
from pulpcore.plugin.download import DownloaderFactory, HttpDownloader
from pulpcore.plugin.pulp_hashlib import new as pulp_hashlib_new

from pulp_container.constants import V2_ACCEPT_HEADERS

Expand Down Expand Up @@ -94,14 +99,48 @@ async def _run(self, handle_401=True, extra_data=None):
return await self._run(handle_401=False, extra_data=extra_data)
else:
raise

to_return = await self._handle_response(response)

await response.release()
self.response_headers = response.headers

if self._close_session_on_finalize:
self.session.close()
return to_return

def _ensure_writer_has_open_file(self):
"""
Create a temporary file on demand.
Create a temporary file when it's actually used, allowing plugin writers to instantiate
many downloaders in memory.
This method sets the path of NamedTemporaryFile dynamically based on whether it is running
from a task or not. Otherwise, permission errors might be raised when Pulp is trying to
download a file from api-app and write to a user space.
"""
if not self._writer:
dir_path = settings.WORKING_DIRECTORY if Task.current() is None else "."
self._writer = tempfile.NamedTemporaryFile(dir=dir_path, delete=False)
self.path = self._writer.name
self._digests = {n: pulp_hashlib_new(n) for n in Artifact.DIGEST_FIELDS}
self._size = 0

def fetch(self, extra_data=None):
"""
Run the download synchronously with additional data and return the `DownloadResult`.
Returns:
:class:`~pulpcore.plugin.download.DownloadResult`
or :class:`~aiohttp.client.ClientResponse`
Raises:
Exception: Any fatal exception emitted during downloading
"""
done, _ = asyncio.get_event_loop().run_until_complete(self.run(extra_data=extra_data))
return done.pop().result()

async def update_token(self, response_auth_header, used_token, repo_name):
"""
Update the Bearer token to be used with all requests.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Generated by Django 4.2.2 on 2023-06-15 09:50

from django.db import migrations, models
import django.db.models.deletion
import pulpcore.app.models.access_policy


class Migration(migrations.Migration):

dependencies = [
('core', '0107_distribution_hidden'),
('container', '0036_containerpushrepository_pending_blobs_manifests'),
]

operations = [
migrations.CreateModel(
name='ContainerPullThroughDistribution',
fields=[
('distribution_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='core.distribution')),
('private', models.BooleanField(default=False, help_text='Restrict pull access to explicitly authorized users. Defaults to unrestricted pull access.')),
],
options={
'default_related_name': '%(app_label)s_%(model_name)s',
},
bases=('core.distribution', pulpcore.app.models.access_policy.AutoAddObjPermsMixin),
),
migrations.CreateModel(
name='ContainerPullThroughRemote',
fields=[
('remote_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='core.remote')),
('upstream_name', models.TextField(db_index=True)),
],
options={
'default_related_name': '%(app_label)s_%(model_name)s',
},
bases=('core.remote', pulpcore.app.models.access_policy.AutoAddObjPermsMixin),
),
migrations.AddField(
model_name='containerrepository',
name='pending_blobs',
field=models.ManyToManyField(related_name='pending_blobs', to='container.blob'),
),
migrations.AddField(
model_name='containerrepository',
name='pending_manifests',
field=models.ManyToManyField(to='container.manifest'),
),
migrations.AddField(
model_name='containerrepository',
name='pending_tags',
field=models.ManyToManyField(to='container.tag'),
),
migrations.AddField(
model_name='containerrepository',
name='remaining_blobs',
field=models.ManyToManyField(related_name='remaining_blobs', to='container.blob'),
),
migrations.AddField(
model_name='containerdistribution',
name='pull_through_distribution',
field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, related_name='distributions', to='container.containerpullthroughdistribution'),
),
]
137 changes: 137 additions & 0 deletions pulp_container/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import tempfile
import time
from logging import getLogger
from pathlib import PurePath

from django.db import models
from django.conf import settings
Expand Down Expand Up @@ -105,6 +106,10 @@ class Manifest(Content):
through_fields=("image_manifest", "manifest_list"),
)

@staticmethod
def init_from_artifact_and_relative_path(artifact, relative_path):
pass

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"
unique_together = ("digest",)
Expand Down Expand Up @@ -401,6 +406,33 @@ def namespaced_upstream_name(self):
else:
return self.upstream_name

def get_remote_artifact_url(self, relative_path=None, request=None):
"""
TODO: ensure that the functionality is not affected by keywords included within the path
"""
if "manifests" in request.path:
if "tag_name" in request.match_info:
tag_name = request.match_info["tag_name"]
return os.path.join(self.url, "v2", relative_path, "manifests", tag_name)
elif "digest" in request.match_info:
digest = "sha256:{digest}".format(digest=request.match_info["digest"])
return os.path.join(self.url, "v2", relative_path, "manifests", digest)
elif "blobs" in request.path:
digest = "sha256:{digest}".format(digest=request.match_info["digest"])
return os.path.join(self.url, "v2", relative_path, "blobs", digest)

def get_remote_artifact_content_type(self, relative_path=None):
"""
TODO: re-evaluate the need of this method
"""
if relative_path:
type_path = PurePath(relative_path)
if type_path.match("manifests/.*"):
return Manifest
elif type_path.match("blobs/.*"):
return Blob
return None

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"
permissions = [
Expand All @@ -411,6 +443,68 @@ class Meta:
]


class ContainerPullThroughRemote(Remote, AutoAddObjPermsMixin):
"""
TODO: Add permissions.
"""

TYPE = "pull-through"

upstream_name = models.TextField(db_index=True)

@property
def download_factory(self):
"""
Downloader Factory that maps to custom downloaders which support registry auth.
Upon first access, the DownloaderFactory is instantiated and saved internally.
Returns:
DownloadFactory: The instantiated DownloaderFactory to be used by
get_downloader()
"""
try:
return self._download_factory
except AttributeError:
self._download_factory = DownloaderFactory(
self,
downloader_overrides={
"http": downloaders.RegistryAuthHttpDownloader,
"https": downloaders.RegistryAuthHttpDownloader,
},
)
return self._download_factory

def get_downloader(self, remote_artifact=None, url=None, **kwargs):
"""
Get a downloader from either a RemoteArtifact or URL that is configured with this Remote.
This method accepts either `remote_artifact` or `url` but not both. At least one is
required. If neither or both are passed a ValueError is raised.
Args:
remote_artifact (:class:`~pulpcore.app.models.RemoteArtifact`): The RemoteArtifact to
download.
url (str): The URL to download.
kwargs (dict): This accepts the parameters of
:class:`~pulpcore.plugin.download.BaseDownloader`.
Raises:
ValueError: If neither remote_artifact and url are passed, or if both are passed.
Returns:
subclass of :class:`~pulpcore.plugin.download.BaseDownloader`: A downloader that
is configured with the remote settings.
"""
kwargs["remote"] = self
return super().get_downloader(remote_artifact=remote_artifact, url=url, **kwargs)

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"


class ManifestSigningService(SigningService):
"""
Signing service used for creating container signatures.
Expand Down Expand Up @@ -485,6 +579,13 @@ class ContainerRepository(
ManifestSigningService, on_delete=models.SET_NULL, null=True
)

# temporary relations used for uncommitted pull-through cache operations
pending_tags = models.ManyToManyField(Tag)
pending_manifests = models.ManyToManyField(Manifest)
pending_blobs = models.ManyToManyField(Blob, related_name="pending_blobs")
# digests of remaining blobs to be attached to pending manifests
remaining_blobs = models.ManyToManyField(Blob, related_name="remaining_blobs")

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"
permissions = [
Expand All @@ -507,6 +608,16 @@ def finalize_new_version(self, new_version):
"""
remove_duplicates(new_version)
validate_repo_version(new_version)
self.remove_pending_content(new_version)

def remove_pending_content(self, repository_version):
"""Remove pending blobs and manifests when committing the content to the repository."""
added_content = repository_version.added(
base_version=repository_version.base_version
).values_list("pk")
self.pending_tags.remove(*Tag.objects.filter(pk__in=added_content))
self.pending_manifests.remove(*Manifest.objects.filter(pk__in=added_content))
self.pending_blobs.remove(*Blob.objects.filter(pk__in=added_content))


class ContainerPushRepository(Repository, AutoAddObjPermsMixin):
Expand Down Expand Up @@ -563,6 +674,25 @@ def remove_pending_content(self, repository_version):
self.pending_manifests.remove(*Manifest.objects.filter(pk__in=added_content))


class ContainerPullThroughDistribution(Distribution, AutoAddObjPermsMixin):
"""
TODO: Add permissions.
"""

TYPE = "pull-through"

private = models.BooleanField(
default=False,
help_text=_(
"Restrict pull access to explicitly authorized users. "
"Defaults to unrestricted pull access."
),
)

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"


class ContainerDistribution(Distribution, AutoAddObjPermsMixin):
"""
A container distribution defines how a repository version is distributed by Pulp's webserver.
Expand Down Expand Up @@ -593,6 +723,13 @@ class ContainerDistribution(Distribution, AutoAddObjPermsMixin):
)
description = models.TextField(null=True)

pull_through_distribution = models.ForeignKey(
ContainerPullThroughDistribution,
related_name="distributions",
on_delete=models.CASCADE,
null=True,
)

def get_repository_version(self):
"""
Returns the repository version that is supposed to be served by this ContainerDistribution.
Expand Down
6 changes: 6 additions & 0 deletions pulp_container/app/redirects.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ def issue_blob_redirect(self, blob):
"""
return self.redirect_to_content_app("blobs", blob.digest)

def issue_pull_through_manifests_redirect(self, pk):
return self.redirect_to_content_app("manifests", pk)

def issue_pull_through_blobs_redirect(self, pk):
return self.redirect_to_content_app("blobs", pk)


class S3StorageRedirects(CommonRedirects):
"""
Expand Down
Loading

0 comments on commit c50e570

Please sign in to comment.