Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import base64
import json
import logging
import time
Expand Down Expand Up @@ -423,7 +424,14 @@ def _get_payload(client_id: str, permission: str, attributes: dict[str, str] | N
"permission": permission,
}
if attributes:
payload["context"] = {"attributes": attributes}
# Per UMA spec, push claims using claim_token parameter with base64-encoded JSON
# Values must be arrays of strings per Keycloak documentation
# See: https://www.keycloak.org/docs/latest/authorization_services/index.html#_service_pushing_claims
claims = {key: [value] for key, value in attributes.items()}
claim_json = json.dumps(claims, sort_keys=True)
claim_token = base64.b64encode(claim_json.encode()).decode()
payload["claim_token"] = claim_token
payload["claim_token_format"] = "urn:ietf:params:oauth:token-type:jwt"

return payload

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,55 @@ def test_get_keycloak_client_with_no_credentials(self, mock_keycloak_openid, aut
)
assert client == mock_keycloak_openid.return_value

@pytest.mark.parametrize(
("client_id", "permission", "attributes", "expected_claims"),
[
# Test without attributes - no claim_token should be added
("test_client", "DAG#GET", None, None),
# Test with single attribute
("test_client", "DAG#READ", {"resource_id": "test_dag"}, {"resource_id": ["test_dag"]}),
# Test with multiple attributes
(
"test_client",
"DAG#GET",
{"resource_id": "my_dag", "dag_entity": "RUN"},
{"dag_entity": ["RUN"], "resource_id": ["my_dag"]}, # sorted by key
),
# Test with different attribute types
(
"my_client",
"Connection#POST",
{"resource_id": "conn123", "extra": "value"},
{"extra": ["value"], "resource_id": ["conn123"]}, # sorted by key
),
],
)
def test_get_payload(self, client_id, permission, attributes, expected_claims, auth_manager):
"""Test _get_payload with various attribute configurations."""
import base64

payload = auth_manager._get_payload(client_id, permission, attributes)

# Verify basic payload structure
assert payload["grant_type"] == "urn:ietf:params:oauth:grant-type:uma-ticket"
assert payload["audience"] == client_id
assert payload["permission"] == permission

if attributes is None:
# When no attributes, claim_token should not be present
assert "claim_token" not in payload
assert "claim_token_format" not in payload
else:
# When attributes are provided, claim_token should be present
assert "claim_token" in payload
assert "claim_token_format" in payload
assert payload["claim_token_format"] == "urn:ietf:params:oauth:token-type:jwt"

# Decode and verify the claim_token contains the attributes as arrays
decoded_claim = base64.b64decode(payload["claim_token"]).decode()
claims = json.loads(decoded_claim)
assert claims == expected_claims

@pytest.mark.parametrize(
("server_url", "expected_url"),
[
Expand Down