From 70cbed09dc9b82c1f38ae786667ed7a2384f58a4 Mon Sep 17 00:00:00 2001 From: Gerrod Ubben Date: Thu, 6 Feb 2025 16:45:51 -0500 Subject: [PATCH] Remove pulp-smash from test_pull_content --- pulp_container/app/serializers.py | 9 + .../functional/api/test_content_cache.py | 2 +- .../tests/functional/api/test_pull_content.py | 464 ++++++------------ 3 files changed, 153 insertions(+), 322 deletions(-) diff --git a/pulp_container/app/serializers.py b/pulp_container/app/serializers.py index 32ca51778..34c939ec3 100644 --- a/pulp_container/app/serializers.py +++ b/pulp_container/app/serializers.py @@ -159,6 +159,15 @@ class BlobSerializer(SingleArtifactContentSerializer): digest = serializers.CharField(help_text="sha256 of the Blob file") + def __init__(self, *args, **kwargs): + """Fix for bindings to allow for on-demand blobs.""" + # TODO: Move into pulpcore + # This is a fix for the bindings to allow for serializing on-demand blobs. + # There is no create API for blobs, so this doesn't affect the API. + super().__init__(*args, **kwargs) + if "artifact" in self.fields: + self.fields["artifact"].allow_null = True + class Meta: fields = SingleArtifactContentSerializer.Meta.fields + ("digest",) model = models.Blob diff --git a/pulp_container/tests/functional/api/test_content_cache.py b/pulp_container/tests/functional/api/test_content_cache.py index 74401d136..81913bf9d 100644 --- a/pulp_container/tests/functional/api/test_content_cache.py +++ b/pulp_container/tests/functional/api/test_content_cache.py @@ -58,7 +58,7 @@ def fetch_response_metadata(response): return response.status_code, response.headers.get("X-PULP-CACHE") -@pytest.mark.parallel +# @pytest.mark.parallel # Some other parallel test is causing this fail periodically def test_content_cache( container_bindings, container_repository_factory, diff --git a/pulp_container/tests/functional/api/test_pull_content.py b/pulp_container/tests/functional/api/test_pull_content.py index 18702a726..bd81c7fbb 100644 --- a/pulp_container/tests/functional/api/test_pull_content.py +++ b/pulp_container/tests/functional/api/test_pull_content.py @@ -1,51 +1,35 @@ """Tests that verify that images served by Pulp can be pulled.""" -import contextlib +import pytest import hashlib import requests -import unittest -import uuid +import subprocess -from urllib.parse import urljoin, urlparse - -from pulp_smash import api, cli, config, exceptions -from pulp_smash.pulp3.bindings import delete_orphans, monitor_task -from pulp_smash.pulp3.utils import ( - get_content, - gen_distribution, - gen_repo, -) +from urllib.parse import urljoin from pulp_container.tests.functional.utils import ( - core_client, - gen_container_client, - gen_container_remote, get_blobsums_from_remote_registry, get_auth_for_url, ) from pulp_container.tests.functional.constants import ( - CONTAINER_CONTENT_NAME, REGISTRY_V2_REPO_HELLO_WORLD, PULP_HELLO_WORLD_LINUX_TAG, ) from pulp_container.constants import EMPTY_BLOB, EMPTY_JSON, MEDIA_TYPE -from pulpcore.client.pulp_container import ( - ContainerContainerDistribution, - ContainerContainerRepository, - ContainerRepositorySyncURL, - DistributionsContainerApi, - RepositoriesContainerApi, - RemotesContainerApi, -) -from pulpcore.client.pulpcore import ArtifactsApi - -class PullContentTestCase(unittest.TestCase): +class TestPullContent: """Verify whether images served by Pulp can be pulled.""" - @classmethod - def setUpClass(cls): + @pytest.fixture(scope="class") + def setup( + self, + container_repository_factory, + container_remote_factory, + container_sync, + container_distribution_factory, + container_bindings, + ): """Create class-wide variables. 1. Create a repository. @@ -53,94 +37,18 @@ def setUpClass(cls): 3. Sync the repository using the remote and re-read the repo data. 4. Create a container distribution to serve the repository 5. Create another container distribution to the serve the repository version - - This tests targets the following issue: - - * `Pulp #4460 `_ """ - cls.cfg = config.get_config() - cls.registry_name = urlparse(cls.cfg.get_base_url()).netloc - - cls.client = api.Client(cls.cfg, api.code_handler) - client_api = gen_container_client() - cls.repositories_api = RepositoriesContainerApi(client_api) - cls.remotes_api = RemotesContainerApi(client_api) - cls.distributions_api = DistributionsContainerApi(client_api) - - cls.teardown_cleanups = [] - - delete_orphans() - - # defining a shared path will help us to identify whether namespaces and - # matching distributions are correctly determined from within the plugin - random_dist_path = str(uuid.uuid4()) - repo_dist_path = f"{random_dist_path}/{str(uuid.uuid4())}" - repo_ver_dist_path = f"{random_dist_path}/{str(uuid.uuid4())}" - - with contextlib.ExitStack() as stack: - # ensure tearDownClass runs if an error occurs here - stack.callback(cls.tearDownClass) - - # Step 0 - create an empty distribution sharing the same path as real distributions - distribution_response = cls.distributions_api.create( - ContainerContainerDistribution(**gen_distribution(base_path=random_dist_path)) - ) - created_resources = monitor_task(distribution_response.task).created_resources - cls.teardown_cleanups.append((cls.distributions_api.delete, created_resources[0])) - - # Step 1 - _repo = cls.repositories_api.create(ContainerContainerRepository(**gen_repo())) - cls.teardown_cleanups.append((cls.repositories_api.delete, _repo.pulp_href)) - - # Step 2 - cls.remote = cls.remotes_api.create(gen_container_remote()) - cls.teardown_cleanups.append((cls.remotes_api.delete, cls.remote.pulp_href)) - - # Step 3 - sync_data = ContainerRepositorySyncURL(remote=cls.remote.pulp_href) - sync_response = cls.repositories_api.sync(_repo.pulp_href, sync_data) - monitor_task(sync_response.task) - cls.repo = cls.repositories_api.read(_repo.pulp_href) - - # Step 4. - distribution_response = cls.distributions_api.create( - ContainerContainerDistribution( - **gen_distribution(repository=cls.repo.pulp_href, base_path=repo_dist_path) - ) - ) - created_resources = monitor_task(distribution_response.task).created_resources - distribution = cls.distributions_api.read(created_resources[0]) - cls.distribution_with_repo = cls.distributions_api.read(distribution.pulp_href) - cls.teardown_cleanups.append( - (cls.distributions_api.delete, cls.distribution_with_repo.pulp_href) - ) - - # Step 5. - distribution_response = cls.distributions_api.create( - ContainerContainerDistribution( - **gen_distribution( - repository_version=cls.repo.latest_version_href, - base_path=repo_ver_dist_path, - ) - ) - ) - created_resources = monitor_task(distribution_response.task).created_resources - distribution = cls.distributions_api.read(created_resources[0]) - cls.distribution_with_repo_version = cls.distributions_api.read(distribution.pulp_href) - cls.teardown_cleanups.append( - (cls.distributions_api.delete, cls.distribution_with_repo_version.pulp_href) - ) - - # remove callback if everything goes well - stack.pop_all() - - @classmethod - def tearDownClass(cls): - """Clean class-wide variable.""" - for cleanup_function, args in reversed(cls.teardown_cleanups): - cleanup_function(args) - - def test_api_returns_same_checksum(self): + repo = container_repository_factory() + remote = container_remote_factory() + container_sync(repo, remote) + repo = container_bindings.RepositoriesContainerApi.read(repo.pulp_href) + distribution_with_repo = container_distribution_factory(repository=repo.pulp_href) + distribution_with_repo_version = container_distribution_factory( + repository_version=repo.latest_version_href + ) + return repo, distribution_with_repo, distribution_with_repo_version + + def test_api_returns_same_checksum(self, container_bindings, setup): """Verify that pulp serves image with the same checksum of remote. 1. Call pulp repository API and get the content_summary for repo. @@ -148,36 +56,36 @@ def test_api_returns_same_checksum(self): 3. Compare the checksums. """ # Get local checksums for content synced from the remote registry - checksums = [ - content["digest"] - for content in get_content(self.repo.to_dict())[CONTAINER_CONTENT_NAME] - ] + repo, _, _ = setup + blobs = container_bindings.ContentBlobsApi.list(repository_version=repo.latest_version_href) + blob_checksums = {c.digest for c in blobs.results} # Assert that at least one layer is synced from remote:latest # and the checksum matched with remote - self.assertTrue( - any([checksum in checksums for checksum in get_blobsums_from_remote_registry()]), - "Cannot find a matching layer on remote registry.", - ) + assert any( + [checksum in blob_checksums for checksum in get_blobsums_from_remote_registry()] + ), "Cannot find a matching layer on remote registry." - def test_api_performes_schema_conversion(self): + def test_api_performes_schema_conversion(self, bindings_cfg, setup): """Verify pull via token with accepted content type.""" - image_path = "/v2/{}/manifests/{}".format(self.distribution_with_repo.base_path, "latest") - latest_image_url = urljoin(self.cfg.get_base_url(), image_path) + _, distribution_with_repo, _ = setup + image_path = "/v2/{}/manifests/{}".format(distribution_with_repo.base_path, "latest") + latest_image_url = urljoin(bindings_cfg.host, image_path) auth = get_auth_for_url(latest_image_url) content_response = requests.get( latest_image_url, auth=auth, headers={"Accept": MEDIA_TYPE.MANIFEST_V1} ) - with self.assertRaises(requests.exceptions.HTTPError): - content_response.raise_for_status() + # I don't understand what this is testing + assert 400 <= content_response.status_code < 500 - def test_create_empty_blob_on_the_fly(self): + def test_create_empty_blob_on_the_fly(self, bindings_cfg, setup): """ Test if empty blob getscreated and served on the fly. """ - blob_path = "/v2/{}/blobs/{}".format(self.distribution_with_repo.base_path, EMPTY_BLOB) - empty_blob_url = urljoin(self.cfg.get_base_url(), blob_path) + _, distribution_with_repo, _ = setup + blob_path = "/v2/{}/blobs/{}".format(distribution_with_repo.base_path, EMPTY_BLOB) + empty_blob_url = urljoin(bindings_cfg.host, blob_path) auth = get_auth_for_url(empty_blob_url) content_response = requests.get(empty_blob_url, auth=auth) @@ -186,9 +94,9 @@ def test_create_empty_blob_on_the_fly(self): digest = hashlib.sha256(content_response.content).hexdigest() # compare with the digest returned in the response header header_digest = content_response.headers["docker-content-digest"].split(":")[1] - self.assertEqual(digest, header_digest) + assert digest == header_digest - def test_pull_image_from_repository(self): + def test_pull_image_from_repository(self, local_registry, registry_client, setup): """Verify that a client can pull the image from Pulp. 1. Using the RegistryClient pull the image from Pulp. @@ -196,23 +104,17 @@ def test_pull_image_from_repository(self): 3. Verify both images has the same checksum. 4. Ensure image is deleted after the test. """ - registry = cli.RegistryClient(self.cfg) - registry.raise_if_unsupported(unittest.SkipTest, "Test requires podman/docker") - registry.login("-u", "admin", "-p", "password", self.registry_name) - - local_url = urljoin(self.cfg.get_base_url(), self.distribution_with_repo.base_path) + _, distribution_with_repo, _ = setup + local_registry.pull(distribution_with_repo.base_path) + local_image = local_registry.inspect(distribution_with_repo.base_path) - registry.pull(local_url) - self.teardown_cleanups.append((registry.rmi, local_url)) - local_image = registry.inspect(local_url) + registry_client.pull(REGISTRY_V2_REPO_HELLO_WORLD) + remote_image = registry_client.inspect(REGISTRY_V2_REPO_HELLO_WORLD) - registry.pull(REGISTRY_V2_REPO_HELLO_WORLD) - remote_image = registry.inspect(REGISTRY_V2_REPO_HELLO_WORLD) + registry_client.rmi(REGISTRY_V2_REPO_HELLO_WORLD) + assert local_image[0]["Id"] == remote_image[0]["Id"] - self.assertEqual(local_image[0]["Id"], remote_image[0]["Id"]) - registry.rmi(REGISTRY_V2_REPO_HELLO_WORLD) - - def test_pull_image_from_repository_version(self): + def test_pull_image_from_repository_version(self, local_registry, registry_client, setup): """Verify that a client can pull the image from Pulp. 1. Using the RegistryClient pull the image from Pulp. @@ -220,23 +122,17 @@ def test_pull_image_from_repository_version(self): 3. Verify both images has the same checksum. 4. Ensure image is deleted after the test. """ - registry = cli.RegistryClient(self.cfg) - registry.raise_if_unsupported(unittest.SkipTest, "Test requires podman/docker") - registry.login("-u", "admin", "-p", "password", self.registry_name) - - local_url = urljoin(self.cfg.get_base_url(), self.distribution_with_repo_version.base_path) + _, _, distribution_with_repo_version = setup + local_registry.pull(distribution_with_repo_version.base_path) + local_image = local_registry.inspect(distribution_with_repo_version.base_path) - registry.pull(local_url) - self.teardown_cleanups.append((registry.rmi, local_url)) - local_image = registry.inspect(local_url) + registry_client.pull(REGISTRY_V2_REPO_HELLO_WORLD) + remote_image = registry_client.inspect(REGISTRY_V2_REPO_HELLO_WORLD) - registry.pull(REGISTRY_V2_REPO_HELLO_WORLD) - remote_image = registry.inspect(REGISTRY_V2_REPO_HELLO_WORLD) + registry_client.rmi(REGISTRY_V2_REPO_HELLO_WORLD) + assert local_image[0]["Id"] == remote_image[0]["Id"] - self.assertEqual(local_image[0]["Id"], remote_image[0]["Id"]) - registry.rmi(REGISTRY_V2_REPO_HELLO_WORLD) - - def test_pull_image_with_tag(self): + def test_pull_image_with_tag(self, local_registry, registry_client, setup): """Verify that a client can pull the image from Pulp with a tag. 1. Using the RegistryClient pull the image from Pulp specifying a tag. @@ -244,59 +140,58 @@ def test_pull_image_with_tag(self): 3. Verify both images has the same checksum. 4. Ensure image is deleted after the test. """ - registry = cli.RegistryClient(self.cfg) - registry.raise_if_unsupported(unittest.SkipTest, "Test requires podman/docker") - registry.login("-u", "admin", "-p", "password", self.registry_name) - - local_url = ( - urljoin(self.cfg.get_base_url(), self.distribution_with_repo.base_path) - + PULP_HELLO_WORLD_LINUX_TAG + _, distribution_with_repo, _ = setup + local_registry.pull(distribution_with_repo.base_path + PULP_HELLO_WORLD_LINUX_TAG) + local_image = local_registry.inspect( + distribution_with_repo.base_path + PULP_HELLO_WORLD_LINUX_TAG ) - registry.pull(local_url) - self.teardown_cleanups.append((registry.rmi, local_url)) - local_image = registry.inspect(local_url) - - registry.pull(REGISTRY_V2_REPO_HELLO_WORLD + PULP_HELLO_WORLD_LINUX_TAG) - self.teardown_cleanups.append( - (registry.rmi, REGISTRY_V2_REPO_HELLO_WORLD + PULP_HELLO_WORLD_LINUX_TAG) + registry_client.pull(REGISTRY_V2_REPO_HELLO_WORLD + PULP_HELLO_WORLD_LINUX_TAG) + remote_image = registry_client.inspect( + REGISTRY_V2_REPO_HELLO_WORLD + PULP_HELLO_WORLD_LINUX_TAG ) - remote_image = registry.inspect(REGISTRY_V2_REPO_HELLO_WORLD + PULP_HELLO_WORLD_LINUX_TAG) - self.assertEqual(local_image[0]["Id"], remote_image[0]["Id"]) + registry_client.rmi(REGISTRY_V2_REPO_HELLO_WORLD + PULP_HELLO_WORLD_LINUX_TAG) + assert local_image[0]["Id"] == remote_image[0]["Id"] - def test_pull_nonexistent_image(self): + def test_pull_nonexistent_image(self, local_registry): """Verify that a client cannot pull nonexistent image from Pulp. 1. Using the RegistryClient try to pull nonexistent image from Pulp. 2. Assert that error is occurred and nothing has been pulled. """ - registry = cli.RegistryClient(self.cfg) - registry.raise_if_unsupported(unittest.SkipTest, "Test requires podman/docker") - registry.login("-u", "admin", "-p", "password", self.registry_name) + with pytest.raises(subprocess.CalledProcessError): + local_registry.pull("inexistentimagename") - local_url = urljoin(self.cfg.get_base_url(), "inexistentimagename") - with self.assertRaises(exceptions.CalledProcessError): - registry.pull(local_url) - - def test_pull_nonexistent_blob(self): + def test_pull_nonexistent_blob(self, bindings_cfg, setup): """ Verify that a GET request to a nonexistent BLOB will be properly handled instead of outputting a stacktrace. """ - blob_path = "/v2/{}/blobs/{}".format(self.distribution_with_repo.base_path, EMPTY_JSON) - non_existing_blob_url = urljoin(self.cfg.get_base_url(), blob_path) + _, distribution_with_repo, _ = setup + blob_path = "/v2/{}/blobs/{}".format(distribution_with_repo.base_path, EMPTY_JSON) + non_existing_blob_url = urljoin(bindings_cfg.host, blob_path) auth = get_auth_for_url(non_existing_blob_url) content_response = requests.get(non_existing_blob_url, auth=auth) - assert content_response.status_code, 404 + assert content_response.status_code == 404 -class PullOnDemandContentTestCase(unittest.TestCase): +class TestPullOnDemandContent: """Verify whether on-demand served images by Pulp can be pulled.""" - @classmethod - def setUpClass(cls): + @pytest.fixture(scope="class") + def setup( + self, + container_repository_factory, + container_remote_factory, + container_sync, + container_distribution_factory, + pulpcore_bindings, + container_bindings, + registry_client, + monitor_task, + ): """Create class-wide variables and delete orphans. 1. Create a repository. @@ -304,79 +199,23 @@ def setUpClass(cls): 3. Sync the repository using the remote and re-read the repo data. 4. Create a container distribution to serve the repository 5. Create another container distribution to the serve the repository version - - This tests targets the following issue: - - * `Pulp #4460 `_ """ - cls.cfg = config.get_config() - cls.registry_name = urlparse(cls.cfg.get_base_url()).netloc - - client_api = gen_container_client() - cls.repositories_api = RepositoriesContainerApi(client_api) - cls.remotes_api = RemotesContainerApi(client_api) - cls.distributions_api = DistributionsContainerApi(client_api) - - cls.teardown_cleanups = [] - - delete_orphans() - - with contextlib.ExitStack() as stack: - # ensure tearDownClass runs if an error occurs here - stack.callback(cls.tearDownClass) - - # Step 1 - _repo = cls.repositories_api.create(ContainerContainerRepository(**gen_repo())) - cls.teardown_cleanups.append((cls.repositories_api.delete, _repo.pulp_href)) - - # Step 2 - cls.remote = cls.remotes_api.create(gen_container_remote(policy="on_demand")) - cls.teardown_cleanups.append((cls.remotes_api.delete, cls.remote.pulp_href)) - - # Step 3 - sync_data = ContainerRepositorySyncURL(remote=cls.remote.pulp_href) - sync_response = cls.repositories_api.sync(_repo.pulp_href, sync_data) - monitor_task(sync_response.task) - - cls.repo = cls.repositories_api.read(_repo.pulp_href) - cls.artifacts_api = ArtifactsApi(core_client) - cls.artifact_count = cls.artifacts_api.list().count - - # Step 4. - distribution_response = cls.distributions_api.create( - ContainerContainerDistribution(**gen_distribution(repository=cls.repo.pulp_href)) - ) - created_resources = monitor_task(distribution_response.task).created_resources - - distribution = cls.distributions_api.read(created_resources[0]) - cls.distribution_with_repo = cls.distributions_api.read(distribution.pulp_href) - cls.teardown_cleanups.append( - (cls.distributions_api.delete, cls.distribution_with_repo.pulp_href) - ) - - # Step 5. - distribution_response = cls.distributions_api.create( - ContainerContainerDistribution( - **gen_distribution(repository_version=cls.repo.latest_version_href) - ) - ) - created_resources = monitor_task(distribution_response.task).created_resources - distribution = cls.distributions_api.read(created_resources[0]) - cls.distribution_with_repo_version = cls.distributions_api.read(distribution.pulp_href) - cls.teardown_cleanups.append( - (cls.distributions_api.delete, cls.distribution_with_repo_version.pulp_href) - ) - - # remove callback if everything goes well - stack.pop_all() - - @classmethod - def tearDownClass(cls): - """Clean class-wide variable.""" - for cleanup_function, args in reversed(cls.teardown_cleanups): - cleanup_function(args) - - def test_api_returns_same_checksum(self): + monitor_task( + pulpcore_bindings.OrphansCleanupApi.cleanup({"orphan_protection_time": 0}).task + ) + registry_client.rmi("-a", "-f") + repo = container_repository_factory() + remote = container_remote_factory(policy="on_demand") + container_sync(repo, remote) + repo = container_bindings.RepositoriesContainerApi.read(repo.pulp_href) + distribution_with_repo = container_distribution_factory(repository=repo.pulp_href) + distribution_with_repo_version = container_distribution_factory( + repository_version=repo.latest_version_href + ) + artifact_count = pulpcore_bindings.ArtifactsApi.list().count + return repo, distribution_with_repo, distribution_with_repo_version, artifact_count + + def test_api_returns_same_checksum(self, container_bindings, setup): """Verify that pulp serves image with the same checksum of remote. 1. Call pulp repository API and get the content_summary for repo. @@ -384,19 +223,19 @@ def test_api_returns_same_checksum(self): 3. Compare the checksums. """ # Get local checksums for content synced from remote registy - checksums = [ - content["digest"] - for content in get_content(self.repo.to_dict())[CONTAINER_CONTENT_NAME] - ] + repo, _, _, _ = setup + blobs = container_bindings.ContentBlobsApi.list(repository_version=repo.latest_version_href) + blob_checksums = {c.digest for c in blobs.results} # Assert that at least one layer is synced from remote:latest # and the checksum matched with remote - self.assertTrue( - any([checksum in checksums for checksum in get_blobsums_from_remote_registry()]), - "Cannot find a matching layer on remote registry.", - ) + assert any( + [checksum in blob_checksums for checksum in get_blobsums_from_remote_registry()] + ), "Cannot find a matching layer on remote registry." - def test_pull_image_from_repository(self): + def test_pull_image_from_repository( + self, local_registry, registry_client, pulpcore_bindings, setup + ): """Verify that a client can pull the image from Pulp (on-demand). 1. Using the RegistryClient pull the image from Pulp. @@ -405,27 +244,21 @@ def test_pull_image_from_repository(self): 4. Verify that the number of artifacts in Pulp has increased. 5. Ensure image is deleted after the test. """ - registry = cli.RegistryClient(self.cfg) - registry.raise_if_unsupported(unittest.SkipTest, "Test requires podman/docker") - registry.login("-u", "admin", "-p", "password", self.registry_name) + _, distribution_with_repo, _, artifact_count = setup + local_registry.pull(distribution_with_repo.base_path) + local_image = local_registry.inspect(distribution_with_repo.base_path) + registry_client.rmi(local_image[0]["Id"]) - local_url = urljoin(self.cfg.get_base_url(), self.distribution_with_repo.base_path) + registry_client.pull(REGISTRY_V2_REPO_HELLO_WORLD) + remote_image = registry_client.inspect(REGISTRY_V2_REPO_HELLO_WORLD) - registry.pull(local_url) - self.teardown_cleanups.append((registry.rmi, local_url)) - local_image = registry.inspect(local_url) + registry_client.rmi(REGISTRY_V2_REPO_HELLO_WORLD) + assert local_image[0]["Id"] == remote_image[0]["Id"] - registry.pull(REGISTRY_V2_REPO_HELLO_WORLD) - remote_image = registry.inspect(REGISTRY_V2_REPO_HELLO_WORLD) + new_artifact_count = pulpcore_bindings.ArtifactsApi.list().count + assert new_artifact_count > artifact_count - self.assertEqual(local_image[0]["Id"], remote_image[0]["Id"]) - - new_artifact_count = self.artifacts_api.list().count - self.assertGreater(new_artifact_count, self.artifact_count) - - registry.rmi(REGISTRY_V2_REPO_HELLO_WORLD) - - def test_pull_image_from_repository_version(self): + def test_pull_image_from_repository_version(self, local_registry, registry_client, setup): """Verify that a client can pull the image from Pulp (on-demand). 1. Using the RegistryClient pull the image from Pulp. @@ -433,23 +266,18 @@ def test_pull_image_from_repository_version(self): 3. Verify both images has the same checksum. 4. Ensure image is deleted after the test. """ - registry = cli.RegistryClient(self.cfg) - registry.raise_if_unsupported(unittest.SkipTest, "Test requires podman/docker") - registry.login("-u", "admin", "-p", "password", self.registry_name) - - local_url = urljoin(self.cfg.get_base_url(), self.distribution_with_repo_version.base_path) + _, _, distribution_with_repo_version, _ = setup + local_registry.pull(distribution_with_repo_version.base_path) + local_image = local_registry.inspect(distribution_with_repo_version.base_path) + registry_client.rmi(local_image[0]["Id"]) - registry.pull(local_url) - self.teardown_cleanups.append((registry.rmi, local_url)) - local_image = registry.inspect(local_url) + registry_client.pull(REGISTRY_V2_REPO_HELLO_WORLD) + remote_image = registry_client.inspect(REGISTRY_V2_REPO_HELLO_WORLD) - registry.pull(REGISTRY_V2_REPO_HELLO_WORLD) - remote_image = registry.inspect(REGISTRY_V2_REPO_HELLO_WORLD) + registry_client.rmi(REGISTRY_V2_REPO_HELLO_WORLD) + assert local_image[0]["Id"] == remote_image[0]["Id"] - self.assertEqual(local_image[0]["Id"], remote_image[0]["Id"]) - registry.rmi(REGISTRY_V2_REPO_HELLO_WORLD) - - def test_pull_image_with_tag(self): + def test_pull_image_with_tag(self, local_registry, registry_client, setup): """Verify that a client can pull the image from Pulp with a tag (on-demand). 1. Using the RegistryClient pull the image from Pulp specifying a tag. @@ -457,23 +285,17 @@ def test_pull_image_with_tag(self): 3. Verify both images has the same checksum. 4. Ensure image is deleted after the test. """ - registry = cli.RegistryClient(self.cfg) - registry.raise_if_unsupported(unittest.SkipTest, "Test requires podman/docker") - registry.login("-u", "admin", "-p", "password", self.registry_name) - - local_url = ( - urljoin(self.cfg.get_base_url(), self.distribution_with_repo.base_path) - + PULP_HELLO_WORLD_LINUX_TAG + _, distribution_with_repo, _, _ = setup + local_registry.pull(distribution_with_repo.base_path + PULP_HELLO_WORLD_LINUX_TAG) + local_image = local_registry.inspect( + distribution_with_repo.base_path + PULP_HELLO_WORLD_LINUX_TAG ) + registry_client.rmi(local_image[0]["Id"]) - registry.pull(local_url) - self.teardown_cleanups.append((registry.rmi, local_url)) - local_image = registry.inspect(local_url) - - registry.pull(REGISTRY_V2_REPO_HELLO_WORLD + PULP_HELLO_WORLD_LINUX_TAG) - self.teardown_cleanups.append( - (registry.rmi, REGISTRY_V2_REPO_HELLO_WORLD + PULP_HELLO_WORLD_LINUX_TAG) + registry_client.pull(REGISTRY_V2_REPO_HELLO_WORLD + PULP_HELLO_WORLD_LINUX_TAG) + remote_image = registry_client.inspect( + REGISTRY_V2_REPO_HELLO_WORLD + PULP_HELLO_WORLD_LINUX_TAG ) - remote_image = registry.inspect(REGISTRY_V2_REPO_HELLO_WORLD + PULP_HELLO_WORLD_LINUX_TAG) - self.assertEqual(local_image[0]["Id"], remote_image[0]["Id"]) + registry_client.rmi(REGISTRY_V2_REPO_HELLO_WORLD + PULP_HELLO_WORLD_LINUX_TAG) + assert local_image[0]["Id"] == remote_image[0]["Id"]