Skip to content

Commit

Permalink
Check a list of scopes when verifying tokens
Browse files Browse the repository at this point in the history
Before this commit, the registry checked one scope, ignoring other
scopes that could relate to blob mounting operations. Due to that,
users without sufficient permissions could mount blobs from other
users unauthorized.

closes pulp#1286
  • Loading branch information
lubosmj committed Jul 21, 2023
1 parent bbf9222 commit d23b907
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 54 deletions.
1 change: 1 addition & 0 deletions CHANGES/1286.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed a security issue that allowed users without sufficient permissions to mount blobs.
48 changes: 30 additions & 18 deletions pulp_container/app/token_verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,24 @@ def _decode_token(encoded_token, request):
return decoded_token


def _contains_accessible_actions(decoded_token, scope):
def _contains_accessible_actions(decoded_token, scopes):
"""
Check if a client has permission to perform operations within the current scope
Check if a client has permission to perform operations within the current scope.
Furthermore, it is necessary to check a compounded scope; for instance, when
performing blob mounting, there are two access scopes considered.
"""
for access in decoded_token["access"]:
if scope.resource_type == access["type"] and scope.name == access["name"]:
if scope.action in access["actions"]:
return True
accessible_actions = []
for scope in scopes:
for access in decoded_token["access"]:
if scope.resource_type == access["type"] and scope.name == access["name"]:
if scope.action in access["actions"]:
accessible_actions.append(True)
break
else:
accessible_actions.append(False)

return False
return all(accessible_actions)


class RegistryAuthentication(BasicAuthentication):
Expand Down Expand Up @@ -141,25 +149,29 @@ def authenticate_header(self, request):
realm = settings.TOKEN_SERVER
authenticate_string = f'{self.keyword} realm="{realm}",service="{request.get_host()}"'

scope = get_scope(request)
if scope is not None:
scopes = get_scopes(request)
for scope in scopes:
authenticate_string += f',scope="{scope.resource_type}:{scope.name}:{scope.action}"'

return authenticate_string


def get_scope(request):
def get_scopes(request):
"""
Return an initialized scope object based on the passed request's data.
Return a list of scope objects based on the passed request's data.
"""
scopes = []

path = request.resolver_match.kwargs.get("path", "")
if path:
action = "pull" if request.method in SAFE_METHODS else "push"
return Scope("repository", path, action)
scopes.append(Scope("repository", path, action))
if request.query_params.keys() == {"from", "mount"}:
scopes.append(Scope("repository", request.query_params["from"], "pull"))
elif request.path == "/v2/_catalog":
return Scope("registry", "catalog", "*")
elif request.path == "/v2/":
return None
scopes.append(Scope("registry", "catalog", "*"))

return scopes


class RegistryPermission(BasePermission):
Expand Down Expand Up @@ -196,13 +208,13 @@ def has_permission(self, request, view):
if decoded_token is None:
raise NotAuthenticated()

scope = get_scope(request)
if scope is None:
scopes = get_scopes(request)
if not scopes:
is_requesting_root_endpoint = len(decoded_token["access"]) == 0
if is_requesting_root_endpoint:
return True
else:
if _contains_accessible_actions(decoded_token, scope):
if _contains_accessible_actions(decoded_token, scopes):
return True

raise AuthenticationFailed(detail="Insufficient permissions", code="insufficient_scope")
67 changes: 42 additions & 25 deletions pulp_container/tests/functional/api/test_rbac_repo_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,11 @@ def test_cross_repository_blob_mount(
container_distribution_api,
container_namespace_api,
container_blob_api,
gen_object_with_cleanup,
):
"""Test that users can cross mount blobs from different repositories."""
if TOKEN_AUTH_DISABLED:
pytest.skip("Cannot test blob mounting without token authentication.")

source_repo = utils.uuid4()
dest_repo = utils.uuid4()
Expand All @@ -210,6 +213,9 @@ def test_cross_repository_blob_mount(
repository = container_push_repository_api.list(name=source_repo).results[0]
blobs = container_blob_api.list(repository_version=repository.latest_version_href).results
distribution = container_distribution_api.list(name=source_repo).results[0]
monitor_task(
container_distribution_api.partial_update(distribution.pulp_href, {"private": True}).task
)

# Cleanup
add_to_cleanup(container_namespace_api, distribution.namespace)
Expand All @@ -231,29 +237,42 @@ def test_cross_repository_blob_mount(
except ValueError:
pass

if not TOKEN_AUTH_DISABLED:
user_consumer = gen_user(
object_roles=[("container.containernamespace_consumer", distribution.namespace)]
)
user_collaborator = gen_user(
object_roles=[("container.containernamespace_collaborator", distribution.namespace)]
)
user_helpless = gen_user()

# Test if a user with pull permission, but not push permission, is not able to mount.
with user_consumer:
content_response, _ = mount_blob(blobs[0], source_repo, dest_repo)
assert content_response.status_code == 401

# Test if a collaborator cannot mount content outside his scope.
with user_collaborator:
content_response, _ = mount_blob(blobs[0], source_repo, dest_repo)
assert content_response.status_code == 401
user_consumer = gen_user(
object_roles=[("container.containernamespace_consumer", distribution.namespace)]
)
user_collaborator = gen_user(
object_roles=[("container.containernamespace_collaborator", distribution.namespace)]
)
user_helpless = gen_user()

# Test if an anonymous user with no permissions is not able to mount.
with user_helpless:
content_response, _ = mount_blob(blobs[0], source_repo, dest_repo)
assert content_response.status_code == 401
# Test if a user with pull permission, but not push permission, is not able to mount.
with user_consumer:
content_response, _ = mount_blob(blobs[0], source_repo, dest_repo)
assert content_response.status_code == 401

# Test if a collaborator cannot mount content outside his scope.
with user_collaborator:
content_response, _ = mount_blob(blobs[0], source_repo, dest_repo)
assert content_response.status_code == 401

# Test if an anonymous user with no permissions is not able to mount.
with user_helpless:
content_response, _ = mount_blob(blobs[0], source_repo, dest_repo)
assert content_response.status_code == 401

# Test if an owner of another namespace cannot utilize the blob mounting because of
# insufficient permissions
dest_tester_namespace = utils.uuid4()
tester_namespace = gen_object_with_cleanup(
container_namespace_api, {"name": dest_tester_namespace}
)
tester_owner = gen_user(
object_roles=[("container.containernamespace_owner", tester_namespace.pulp_href)]
)
with tester_owner:
local_registry.tag_and_push(image_path, f"{tester_namespace.name}/test:manifest_a")
content_response, auth = mount_blob(blobs[0], source_repo, f"{dest_tester_namespace}/test")
assert content_response.status_code == 401


@pytest.fixture
Expand All @@ -262,9 +281,7 @@ def mount_blob(local_registry, container_distribution_api, container_namespace_a

def _mount_blob(blob, source, dest):
mount_path = f"/v2/{dest}/blobs/uploads/?from={source}&mount={blob.digest}"
response, auth = local_registry.get_response(
"POST", mount_path, scope=f"repository:{source}:pull"
)
response, auth = local_registry.get_response("POST", mount_path)

if response.status_code == 201:
distribution = container_distribution_api.list(name=dest).results[0]
Expand Down
4 changes: 2 additions & 2 deletions pulp_container/tests/functional/api/test_repositories_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ def get_listed_repositories(self, auth=None):
authenticate_header = content_response.headers["Www-Authenticate"]

queries = AuthenticationHeaderQueries(authenticate_header)
self.assertEqual(queries.scope, "registry:catalog:*")
self.assertEqual(queries.scopes, ["registry:catalog:*"])

content_response = requests.get(
queries.realm, params={"service": queries.service, "scope": queries.scope}, auth=auth
queries.realm, params={"service": queries.service, "scope": queries.scopes}, auth=auth
)
content_response.raise_for_status()

Expand Down
8 changes: 1 addition & 7 deletions pulp_container/tests/functional/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,6 @@ def get_response(method, path, **kwargs):
if TOKEN_AUTH_DISABLED:
auth = basic_auth
else:
extended_scope = kwargs.pop("scope", None)

with pytest.raises(requests.HTTPError):
response = requests.request(method, url, auth=basic_auth, **kwargs)
response.raise_for_status()
Expand All @@ -166,13 +164,9 @@ def get_response(method, path, **kwargs):
authenticate_header = response.headers["WWW-Authenticate"]
queries = AuthenticationHeaderQueries(authenticate_header)

scopes = [queries.scope]
if extended_scope:
scopes.append(extended_scope)

content_response = requests.get(
queries.realm,
params={"service": queries.service, "scope": scopes},
params={"service": queries.service, "scope": queries.scopes},
auth=basic_auth,
)
content_response.raise_for_status()
Expand Down
9 changes: 7 additions & 2 deletions pulp_container/tests/functional/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,16 @@ def __init__(self, authenticate_header):
The scope is not provided by the token server because we are accessing the endpoint from
the root.
"""
self.scopes = []

if not authenticate_header.lower().startswith("bearer "):
raise Exception(f"Authentication header has wrong format.\n{authenticate_header}")
for item in authenticate_header[7:].split(","):
key, value = item.split("=")
setattr(self, key, value.strip('"'))
if key == "scope":
self.scopes.append(value.strip('"'))
else:
setattr(self, key, value.strip('"'))


skip_if = partial(selectors.skip_if, exc=SkipTest)
Expand Down Expand Up @@ -257,7 +262,7 @@ def get_auth_for_url(registry_endpoint_url, auth=None):
authenticate_header = response.headers["WWW-Authenticate"]
queries = AuthenticationHeaderQueries(authenticate_header)
content_response = requests.get(
queries.realm, params={"service": queries.service, "scope": queries.scope}, auth=auth
queries.realm, params={"service": queries.service, "scope": queries.scopes}, auth=auth
)
content_response.raise_for_status()
token = content_response.json()["token"]
Expand Down

0 comments on commit d23b907

Please sign in to comment.