Skip to content

Commit 2390f7a

Browse files
acrocamikeee
andauthored
Add kid header to kafka jwt auth (#4099)
Signed-off-by: Albert Callarisa <albert@diagrid.io> Signed-off-by: Albert Callarisa <albert@acroca.com> Co-authored-by: Mike Nguyen <hey@mike.ee>
1 parent 9b3a390 commit 2390f7a

File tree

9 files changed

+54
-3
lines changed

9 files changed

+54
-3
lines changed

bindings/kafka/metadata.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,12 @@ authenticationProfiles:
187187
example: |
188188
{"cluster":"kafka","poolid":"kafkapool"}
189189
type: string
190+
- name: oidcKid
191+
type: string
192+
required: false
193+
description: |
194+
The JWT key ID (kid) to use for the client assertion.
195+
example: '"1234567890"'
190196
- title: "SASL Authentication"
191197
description: |
192198
Authenticate using SASL.

common/component/kafka/metadata.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ type KafkaMetadata struct {
8686
OidcClientAssertionKey string `mapstructure:"oidcClientAssertionKey"`
8787
OidcResource string `mapstructure:"oidcResource"`
8888
OidcAudience string `mapstructure:"oidcAudience"`
89+
OidcKid string `mapstructure:"oidcKid"`
8990
internalOidcScopes []string `mapstructure:"-"`
9091
TLSDisable bool `mapstructure:"disableTls"`
9192
TLSSkipVerify bool `mapstructure:"skipVerify"`

common/component/kafka/metadata_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,11 @@ func TestMissingOidcPrivateKeyJwtValues(t *testing.T) {
273273
meta, err = k.getKafkaMetadata(m)
274274
require.NoError(t, err)
275275
require.Contains(t, meta.internalOidcScopes, "openid")
276+
277+
m["oidcKid"] = "1234567890"
278+
meta, err = k.getKafkaMetadata(m)
279+
require.NoError(t, err)
280+
require.Equal(t, "1234567890", meta.OidcKid)
276281
}
277282

278283
func TestPresentSaslValues(t *testing.T) {

common/component/kafka/sasl_oauthbearer_private_key_jwt.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/IBM/sarama"
3232
"github.com/google/uuid"
3333
"github.com/lestrrat-go/jwx/v2/jwa"
34+
"github.com/lestrrat-go/jwx/v2/jws"
3435
"github.com/lestrrat-go/jwx/v2/jwt"
3536
"golang.org/x/oauth2"
3637
)
@@ -50,6 +51,7 @@ type OAuthTokenSourcePrivateKeyJWT struct {
5051
ClientAssertionKey string
5152
Resource string
5253
Audience string
54+
Kid string
5355
}
5456

5557
type tokenResponse struct {
@@ -70,6 +72,7 @@ func (m KafkaMetadata) getOAuthTokenSourcePrivateKeyJWT() *OAuthTokenSourcePriva
7072
ClientAssertionKey: m.OidcClientAssertionKey,
7173
Resource: m.OidcResource,
7274
Audience: m.OidcAudience,
75+
Kid: m.OidcKid,
7376
}
7477
}
7578

@@ -166,7 +169,15 @@ func (ts *OAuthTokenSourcePrivateKeyJWT) Token() (*sarama.AccessToken, error) {
166169
return nil, fmt.Errorf("failed to build token: %w", err)
167170
}
168171

169-
assertion, err := jwt.Sign(token, jwt.WithKey(jwa.RS256, rsaKey))
172+
var signOptions []jwt.Option
173+
if ts.Kid != "" {
174+
headers := jws.NewHeaders()
175+
if err = headers.Set("kid", ts.Kid); err != nil {
176+
return nil, fmt.Errorf("error setting JWT kid header: %w", err)
177+
}
178+
signOptions = append(signOptions, jws.WithProtectedHeaders(headers))
179+
}
180+
assertion, err := jwt.Sign(token, jwt.WithKey(jwa.RS256, rsaKey, signOptions...))
170181
if err != nil {
171182
return nil, fmt.Errorf("error signing client assertion: %w", err)
172183
}

pubsub/kafka/metadata.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,12 @@ authenticationProfiles:
181181
example: |
182182
{"cluster":"kafka","poolid":"kafkapool"}
183183
type: string
184+
- name: oidcKid
185+
type: string
186+
required: false
187+
description: |
188+
The JWT key ID (kid) to use for the client assertion.
189+
example: '"1234567890"'
184190
- title: "SASL Authentication"
185191
description: |
186192
Authenticate using SASL.

tests/certification/pubsub/kafka/components/auth_oidc_certs/kafka.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ spec:
2828
value: "openid"
2929
- name: disableTls
3030
value: "true"
31+
- name: oidcKid
32+
secretKeyRef:
33+
name: OIDC_CLIENT_KID
34+
key: OIDC_CLIENT_KID
3135

3236
auth:
3337
secretStore: envvar-secret-store

tests/certification/pubsub/kafka/data/realm-export.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
"secret": "dapr-kafka-secret",
99
"clientAuthenticatorType": "client-jwt",
1010
"attributes": {
11-
"token.endpoint.auth.signing.alg": "RS256",
12-
"jwt.credential.certificate": "${OIDC_CLIENT_ASSERTION_CERT_ONELINE}"
11+
"use.jwks.url": "true",
12+
"jwks.url": "http://jwks:80/jwks.json"
1313
},
1414
"enabled": true,
1515
"protocol": "openid-connect",

tests/certification/pubsub/kafka/docker-compose.auth.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,14 @@ services:
6060
- "8080:8080"
6161
volumes:
6262
- ./data/realm-export.json:/opt/keycloak/data/import/local-realm.json
63+
64+
jwks:
65+
image: nginx:1.27-alpine
66+
hostname: jwks
67+
container_name: kafka_jwks
68+
environment:
69+
- OIDC_CLIENT_KID
70+
- OIDC_CLIENT_JWK_N
71+
- OIDC_CLIENT_JWK_E
72+
command: >
73+
/bin/sh -c 'printf "{\n \"keys\": [\n {\n \"kty\": \"RSA\",\n \"use\": \"sig\",\n \"alg\": \"RS256\",\n \"kid\": \"%s\",\n \"n\": \"%s\",\n \"e\": \"%s\"\n }\n ]\n}\n" "$$OIDC_CLIENT_KID" "$$OIDC_CLIENT_JWK_N" "$$OIDC_CLIENT_JWK_E" > /usr/share/nginx/html/jwks.json && exec nginx -g "daemon off;"'

tests/certification/pubsub/kafka/kafka_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"crypto/rsa"
2020
"crypto/x509"
2121
"crypto/x509/pkix"
22+
"encoding/base64"
2223
"encoding/json"
2324
"encoding/pem"
2425
"fmt"
@@ -522,6 +523,12 @@ func TestKafkaAuth(t *testing.T) {
522523
os.Setenv("OIDC_CLIENT_ASSERTION_KEY", string(keyPEM))
523524
os.Setenv("OIDC_CLIENT_ASSERTION_CERT_ONELINE", strings.ReplaceAll(string(certPEM), "\n", "\\n"))
524525

526+
modulus := key.PublicKey.N.Bytes()
527+
os.Setenv("OIDC_CLIENT_JWK_N", base64.RawURLEncoding.EncodeToString(modulus))
528+
exponent := big.NewInt(int64(key.PublicKey.E)).Bytes()
529+
os.Setenv("OIDC_CLIENT_JWK_E", base64.RawURLEncoding.EncodeToString(exponent))
530+
os.Setenv("OIDC_CLIENT_KID", uuid.New().String())
531+
525532
flow.New(t, "kafka authentication").
526533
Step(dockercompose.Run(clusterNameAuth, dockerComposeYAMLAuth)).
527534
Step("wait for broker sockets",

0 commit comments

Comments
 (0)