forked from quay/quay
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_proxy_e2e.py
89 lines (73 loc) · 2.94 KB
/
test_proxy_e2e.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import json
import unittest
import pytest
from app import model_cache
from data.database import ProxyCacheConfig, User
from proxy import Proxy, UpstreamRegistryError
@pytest.mark.e2e
class TestProxyE2E(unittest.TestCase):
media_type = "application/vnd.docker.distribution.manifest.v2+json"
registry = "docker.io"
repo = "library/postgres"
tag = 14
tag_404 = 666
digest_404 = "sha256:3e23e8160039594a33894f6564e1b1348bbd7a0088d42c4acb73eeaed59c009d"
digest = None # set by setup
proxy = None # set by setup
org = None # set by setup
@pytest.fixture(autouse=True)
def setup(self):
config = ProxyCacheConfig(
upstream_registry=self.registry,
organization=User(username="cache-org", organization=True),
)
if self.proxy is None:
self.proxy = Proxy(config, self.repo)
if self.digest is None:
raw_manifest, content_type = self.proxy.get_manifest(
image_ref=self.tag,
)
manifest = json.loads(raw_manifest)
self.digest = manifest["fsLayers"][0]["blobSum"]
def test_manifest_exists(self):
digest = self.proxy.manifest_exists(image_ref=self.tag)
self.assertIsNotNone(digest)
def test_manifest_exists_404(self):
with pytest.raises(UpstreamRegistryError):
self.proxy.manifest_exists(image_ref=self.tag_404)
def test_get_manifest(self):
try:
self.proxy.get_manifest(image_ref=self.tag)
except Exception as e:
assert False, f"unexpected exception {e}"
def test_get_manifest_404(self):
with pytest.raises(UpstreamRegistryError):
self.proxy.get_manifest(image_ref=self.tag_404)
def test_get_manifest_renews_expired_token(self):
if not hasattr(model_cache, "empty_for_testing"):
# don't continue testing if we can't empty the cache as it will certainly fail.
return
# by clearing the cache and proxy session we force the proxy to
# re-authenticate against the upstream registry.
model_cache.empty_for_testing()
self.proxy._session.headers.pop("Authorization")
try:
self.proxy.get_manifest(image_ref=self.tag)
except Exception as e:
assert False, f"unexpected exception {e}"
def test_blob_exists(self):
try:
self.proxy.blob_exists(digest=self.digest)
except Exception as e:
assert False, f"unexpected exception {e}"
def test_blob_exists_404(self):
with pytest.raises(UpstreamRegistryError):
self.proxy.blob_exists(digest=self.digest_404)
def test_get_blob(self):
try:
self.proxy.get_blob(self.digest)
except Exception as e:
assert False, f"unexpected exception {e}"
def test_get_blob_404(self):
with pytest.raises(UpstreamRegistryError):
self.proxy.get_blob(self.digest_404)