Skip to content

Commit 067e3c0

Browse files
[feat][fn] PIP-257: Support mounting k8s ServiceAccount for OIDC auth (#19888)
PIP: #19771 ### Motivation In order to make OIDC work with functions, we must give them a way to authenticate with the broker using tokens that are able to be validated by an using an Authorization Server. This PR introduces the `KubernetesServiceAccountAuthProvider`. ### Modifications * Create an `KubernetesServiceAccountAuthProvider` implementation. It adds a service account token volume projection as defined in the k8s docs [here](https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/#service-account-token-volume-projection). The implementation provides a way to specify the expiration time that the token will receive. * Instead of creating a secret with the broker's trusted `ca.crt` in it, this new `KubernetesServiceAccountAuthProvider` expects a secret to already exist with the `ca.crt`. The major advantage for this implementation is that when the `ca.crt` is rotated, we can refresh it (assuming the client is configured to observe the updated file). * Add support for specifying the token's expiration and audience. * One point of divergence from the `KubernetesSecretsTokenAuthProvider` implementation is that I did not provide a way for functions to authenticate as the anonymous role. It seems like a stretch that functions would use such authentication because it will not be multi-tenant. However, if that is a concern, we can add the support. * The feature will be configured with the following yaml: ```yaml functionRuntimeFactoryConfigs: kubernetesFunctionAuthProviderConfig: brokerClientTrustCertsSecretName: "secret-name" serviceAccountTokenExpirationSeconds: 3600 serviceAccountTokenAudience: "pulsar-cluster-audience" ``` ### Verifying this change I verified the correctness of the code with unit tests. I'll verify the integration with k8s once we've determined this PR's design is correct. ### Does this pull request potentially affect one of the following parts: This adds new configuration options to the function worker. ### Documentation - [x] `doc-required` ### Matching PR in forked repository PR in forked repository: michaeljmarshall#36
1 parent dd05408 commit 067e3c0

File tree

5 files changed

+355
-1
lines changed

5 files changed

+355
-1
lines changed

pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java

+12
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import io.kubernetes.client.openapi.apis.CoreV1Api;
2222
import io.kubernetes.client.openapi.models.V1StatefulSet;
23+
import java.util.Map;
2324
import java.util.Optional;
2425
import org.apache.pulsar.common.util.Reflections;
2526
import org.apache.pulsar.functions.proto.Function;
@@ -31,13 +32,24 @@ public interface KubernetesFunctionAuthProvider extends FunctionAuthProvider {
3132

3233
void initialize(CoreV1Api coreClient);
3334

35+
/**
36+
* @deprecated use
37+
* {@link #initialize(CoreV1Api, byte[], java.util.function.Function, Map)}
38+
*/
39+
@Deprecated(since = "3.0.0")
3440
default void initialize(CoreV1Api coreClient, byte[] caBytes,
3541
java.util.function.Function<Function.FunctionDetails, String> namespaceCustomizerFunc) {
3642
setCaBytes(caBytes);
3743
setNamespaceProviderFunc(namespaceCustomizerFunc);
3844
initialize(coreClient);
3945
}
4046

47+
default void initialize(CoreV1Api coreClient, byte[] caBytes,
48+
java.util.function.Function<Function.FunctionDetails, String> namespaceCustomizerFunc,
49+
Map<String, Object> config) {
50+
initialize(coreClient, caBytes, namespaceCustomizerFunc);
51+
}
52+
4153
default void setCaBytes(byte[] caBytes) {
4254

4355
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.functions.auth;
20+
21+
import io.kubernetes.client.openapi.apis.CoreV1Api;
22+
import io.kubernetes.client.openapi.models.V1Container;
23+
import io.kubernetes.client.openapi.models.V1KeyToPath;
24+
import io.kubernetes.client.openapi.models.V1PodSpec;
25+
import io.kubernetes.client.openapi.models.V1ProjectedVolumeSource;
26+
import io.kubernetes.client.openapi.models.V1SecretVolumeSource;
27+
import io.kubernetes.client.openapi.models.V1ServiceAccountTokenProjection;
28+
import io.kubernetes.client.openapi.models.V1StatefulSet;
29+
import io.kubernetes.client.openapi.models.V1Volume;
30+
import io.kubernetes.client.openapi.models.V1VolumeMount;
31+
import io.kubernetes.client.openapi.models.V1VolumeProjection;
32+
import java.nio.file.Paths;
33+
import java.util.Map;
34+
import java.util.Optional;
35+
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
36+
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
37+
import org.apache.pulsar.functions.instance.AuthenticationConfig;
38+
import org.apache.pulsar.functions.proto.Function;
39+
import org.eclipse.jetty.util.StringUtil;
40+
41+
/**
42+
* Kubernetes Function Authentication Provider that adds Service Account Token Projection to a function pod's container
43+
* definition. This token can be used to authenticate the function instance with the broker and the function worker via
44+
* OpenId Connect when each server is configured to trust the kubernetes issuer. See docs for additional details.
45+
* Relevant settings:
46+
* <p>
47+
* brokerClientTrustCertsSecretName: The Kubernetes secret containing the broker's trust certs. If it is not set,
48+
* the function will not use a custom trust store. The secret must already exist in each function's target
49+
* namespace. The secret must contain a key named `ca.crt` with the trust certs. Only the ca.crt will be mounted.
50+
* </p>
51+
* <p>
52+
* serviceAccountTokenExpirationSeconds: The expiration for the token created by the
53+
* {@link KubernetesServiceAccountTokenAuthProvider}. The default value is 3600 seconds.
54+
* </p>
55+
* <p>
56+
* serviceAccountTokenAudience: The audience for the token created by the
57+
* {@link KubernetesServiceAccountTokenAuthProvider}.
58+
* </p>
59+
* Note: the pod inherits the namespace's default service account.
60+
*/
61+
public class KubernetesServiceAccountTokenAuthProvider implements KubernetesFunctionAuthProvider {
62+
63+
private static final String BROKER_CLIENT_TRUST_CERTS_SECRET_NAME = "brokerClientTrustCertsSecretName";
64+
private static final String SERVICE_ACCOUNT_TOKEN_EXPIRATION_SECONDS = "serviceAccountTokenExpirationSeconds";
65+
private static final String SERVICE_ACCOUNT_TOKEN_AUDIENCE = "serviceAccountTokenAudience";
66+
67+
private static final String SERVICE_ACCOUNT_VOLUME_NAME = "service-account-token";
68+
private static final String TRUST_CERT_VOLUME_NAME = "ca-cert";
69+
private static final String DEFAULT_MOUNT_DIR = "/etc/auth";
70+
private static final String FUNCTION_AUTH_TOKEN = "token";
71+
private static final String FUNCTION_CA_CERT = "ca.crt";
72+
private static final String DEFAULT_CERT_PATH = DEFAULT_MOUNT_DIR + "/" + FUNCTION_CA_CERT;
73+
private String brokerTrustCertsSecretName;
74+
private long serviceAccountTokenExpirationSeconds;
75+
private String serviceAccountTokenAudience;
76+
77+
@Override
78+
public void initialize(CoreV1Api coreClient, byte[] caBytes,
79+
java.util.function.Function<Function.FunctionDetails, String> namespaceCustomizerFunc,
80+
Map<String, Object> config) {
81+
setNamespaceProviderFunc(namespaceCustomizerFunc);
82+
Object certSecretName = config.get(BROKER_CLIENT_TRUST_CERTS_SECRET_NAME);
83+
if (certSecretName instanceof String) {
84+
brokerTrustCertsSecretName = (String) certSecretName;
85+
} else if (certSecretName != null) {
86+
// Throw exception because user set this configuration, but it isn't valid.
87+
throw new IllegalArgumentException("Invalid value for " + BROKER_CLIENT_TRUST_CERTS_SECRET_NAME
88+
+ ". Expected a string.");
89+
}
90+
Object tokenExpirationSeconds = config.get(SERVICE_ACCOUNT_TOKEN_EXPIRATION_SECONDS);
91+
if (tokenExpirationSeconds instanceof Long) {
92+
serviceAccountTokenExpirationSeconds = (Long) tokenExpirationSeconds;
93+
} else if (tokenExpirationSeconds instanceof String) {
94+
try {
95+
serviceAccountTokenExpirationSeconds = Long.parseLong((String) tokenExpirationSeconds);
96+
} catch (NumberFormatException e) {
97+
throw new IllegalArgumentException("Invalid value for " + SERVICE_ACCOUNT_TOKEN_EXPIRATION_SECONDS
98+
+ ". Expected a long.");
99+
}
100+
} else if (tokenExpirationSeconds != null) {
101+
// Throw exception because user set this configuration, but it isn't valid.
102+
throw new IllegalArgumentException("Invalid value for " + SERVICE_ACCOUNT_TOKEN_EXPIRATION_SECONDS
103+
+ ". Expected a long.");
104+
}
105+
Object tokenAudience = config.get(SERVICE_ACCOUNT_TOKEN_AUDIENCE);
106+
if (tokenAudience instanceof String) {
107+
serviceAccountTokenAudience = (String) tokenAudience;
108+
} else if (tokenAudience != null) {
109+
throw new IllegalArgumentException("Invalid value for " + SERVICE_ACCOUNT_TOKEN_AUDIENCE
110+
+ ". Expected a string.");
111+
}
112+
}
113+
114+
@Override
115+
public void configureAuthenticationConfig(AuthenticationConfig authConfig,
116+
Optional<FunctionAuthData> functionAuthData) {
117+
authConfig.setClientAuthenticationPlugin(AuthenticationToken.class.getName());
118+
authConfig.setClientAuthenticationParameters(Paths.get(DEFAULT_MOUNT_DIR, FUNCTION_AUTH_TOKEN)
119+
.toUri().toString());
120+
if (StringUtil.isNotBlank(brokerTrustCertsSecretName)) {
121+
authConfig.setTlsTrustCertsFilePath(DEFAULT_CERT_PATH);
122+
}
123+
}
124+
125+
/**
126+
* No need to cache anything. Kubernetes generates the token used for authentication.
127+
*/
128+
@Override
129+
public Optional<FunctionAuthData> cacheAuthData(Function.FunctionDetails funcDetails,
130+
AuthenticationDataSource authenticationDataSource)
131+
throws Exception {
132+
return Optional.empty();
133+
}
134+
135+
/**
136+
* No need to update anything. Kubernetes updates the token used for authentication.
137+
*/
138+
@Override
139+
public Optional<FunctionAuthData> updateAuthData(Function.FunctionDetails funcDetails,
140+
Optional<FunctionAuthData> existingFunctionAuthData,
141+
AuthenticationDataSource authenticationDataSource)
142+
throws Exception {
143+
return Optional.empty();
144+
}
145+
146+
/**
147+
* No need to clean up anything. Kubernetes cleans up the secret when the pod is deleted.
148+
*/
149+
@Override
150+
public void cleanUpAuthData(Function.FunctionDetails funcDetails, Optional<FunctionAuthData> functionAuthData)
151+
throws Exception {
152+
153+
}
154+
155+
@Override
156+
public void initialize(CoreV1Api coreClient) {
157+
}
158+
159+
@Override
160+
public void configureAuthDataStatefulSet(V1StatefulSet statefulSet, Optional<FunctionAuthData> functionAuthData) {
161+
V1PodSpec podSpec = statefulSet.getSpec().getTemplate().getSpec();
162+
// configure pod mount secret with auth token
163+
if (StringUtil.isNotBlank(brokerTrustCertsSecretName)) {
164+
podSpec.addVolumesItem(createTrustCertVolume());
165+
}
166+
podSpec.addVolumesItem(createServiceAccountVolume());
167+
podSpec.getContainers().forEach(this::addVolumeMountsToContainer);
168+
}
169+
170+
private V1Volume createServiceAccountVolume() {
171+
V1ProjectedVolumeSource projectedVolumeSource = new V1ProjectedVolumeSource();
172+
V1VolumeProjection volumeProjection = new V1VolumeProjection();
173+
volumeProjection.serviceAccountToken(
174+
new V1ServiceAccountTokenProjection()
175+
.audience(serviceAccountTokenAudience)
176+
.expirationSeconds(serviceAccountTokenExpirationSeconds)
177+
.path(FUNCTION_AUTH_TOKEN));
178+
projectedVolumeSource.addSourcesItem(volumeProjection);
179+
return new V1Volume()
180+
.name(SERVICE_ACCOUNT_VOLUME_NAME)
181+
.projected(projectedVolumeSource);
182+
}
183+
184+
private V1Volume createTrustCertVolume() {
185+
return new V1Volume()
186+
.name(TRUST_CERT_VOLUME_NAME)
187+
.secret(new V1SecretVolumeSource()
188+
.secretName(brokerTrustCertsSecretName)
189+
.addItemsItem(new V1KeyToPath()
190+
.key(FUNCTION_CA_CERT)
191+
.path(FUNCTION_CA_CERT)));
192+
}
193+
194+
private void addVolumeMountsToContainer(V1Container container) {
195+
container.addVolumeMountsItem(
196+
new V1VolumeMount()
197+
.name(SERVICE_ACCOUNT_VOLUME_NAME)
198+
.mountPath(DEFAULT_MOUNT_DIR)
199+
.readOnly(true));
200+
if (StringUtil.isNotBlank(brokerTrustCertsSecretName)) {
201+
container.addVolumeMountsItem(
202+
new V1VolumeMount()
203+
.name(TRUST_CERT_VOLUME_NAME)
204+
.mountPath(DEFAULT_MOUNT_DIR)
205+
.readOnly(true));
206+
}
207+
}
208+
}

pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ public void initialize(WorkerConfig workerConfig, AuthenticationConfig authentic
252252
kubernetesFunctionAuthProvider.initialize(coreClient, serverCaBytes,
253253
(funcDetails) -> getRuntimeCustomizer()
254254
.map((customizer) -> customizer.customizeNamespace(funcDetails, jobNamespace))
255-
.orElse(jobNamespace));
255+
.orElse(jobNamespace), factoryConfig.getKubernetesFunctionAuthProviderConfig());
256256
this.authProvider = Optional.of(kubernetesFunctionAuthProvider);
257257
}
258258
} else {

pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java

+6
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.functions.runtime.kubernetes;
2020

21+
import java.util.HashMap;
2122
import java.util.Map;
2223
import lombok.Data;
2324
import lombok.experimental.Accessors;
@@ -169,4 +170,9 @@ public class KubernetesRuntimeFactoryConfig {
169170
)
170171
protected int gracePeriodSeconds = 5;
171172

173+
@FieldContext(
174+
doc = "A map of custom configurations passed to implementations of the KubernetesFunctionAuthProvider"
175+
+ " interface."
176+
)
177+
private Map<String, Object> kubernetesFunctionAuthProviderConfig = new HashMap<>();
172178
}

0 commit comments

Comments
 (0)