Skip to content

Commit c2a93ec

Browse files
[feat][fn] PIP-257: Support mounting k8s ServiceAccount for OIDC auth (apache#19888)
PIP: apache#19771 In order to make OIDC work with functions, we must give them a way to authenticate with the broker using tokens that are able to be validated by an using an Authorization Server. This PR introduces the `KubernetesServiceAccountAuthProvider`. * Create an `KubernetesServiceAccountAuthProvider` implementation. It adds a service account token volume projection as defined in the k8s docs [here](https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/#service-account-token-volume-projection). The implementation provides a way to specify the expiration time that the token will receive. * Instead of creating a secret with the broker's trusted `ca.crt` in it, this new `KubernetesServiceAccountAuthProvider` expects a secret to already exist with the `ca.crt`. The major advantage for this implementation is that when the `ca.crt` is rotated, we can refresh it (assuming the client is configured to observe the updated file). * Add support for specifying the token's expiration and audience. * One point of divergence from the `KubernetesSecretsTokenAuthProvider` implementation is that I did not provide a way for functions to authenticate as the anonymous role. It seems like a stretch that functions would use such authentication because it will not be multi-tenant. However, if that is a concern, we can add the support. * The feature will be configured with the following yaml: ```yaml functionRuntimeFactoryConfigs: kubernetesFunctionAuthProviderConfig: brokerClientTrustCertsSecretName: "secret-name" serviceAccountTokenExpirationSeconds: 3600 serviceAccountTokenAudience: "pulsar-cluster-audience" ``` I verified the correctness of the code with unit tests. I'll verify the integration with k8s once we've determined this PR's design is correct. This adds new configuration options to the function worker. - [x] `doc-required` PR in forked repository: #36 (cherry picked from commit 067e3c0)
1 parent a1450f8 commit c2a93ec

File tree

5 files changed

+365
-3
lines changed

5 files changed

+365
-3
lines changed

pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020

2121
import io.kubernetes.client.openapi.apis.CoreV1Api;
2222
import io.kubernetes.client.openapi.models.V1StatefulSet;
23+
import java.util.Map;
24+
import java.util.Optional;
25+
import org.apache.pulsar.common.util.Reflections;
2326
import org.apache.pulsar.functions.proto.Function;
2427
import org.apache.pulsar.common.util.Reflections;
2528

@@ -32,12 +35,24 @@ public interface KubernetesFunctionAuthProvider extends FunctionAuthProvider {
3235

3336
void initialize(CoreV1Api coreClient);
3437

35-
default void initialize(CoreV1Api coreClient, byte[] caBytes, java.util.function.Function<Function.FunctionDetails, String> namespaceCustomizerFunc) {
38+
/**
39+
* @deprecated use
40+
* {@link #initialize(CoreV1Api, byte[], java.util.function.Function, Map)}
41+
*/
42+
@Deprecated
43+
default void initialize(CoreV1Api coreClient, byte[] caBytes,
44+
java.util.function.Function<Function.FunctionDetails, String> namespaceCustomizerFunc) {
3645
setCaBytes(caBytes);
3746
setNamespaceProviderFunc(namespaceCustomizerFunc);
3847
initialize(coreClient);
3948
}
4049

50+
default void initialize(CoreV1Api coreClient, byte[] caBytes,
51+
java.util.function.Function<Function.FunctionDetails, String> namespaceCustomizerFunc,
52+
Map<String, Object> config) {
53+
initialize(coreClient, caBytes, namespaceCustomizerFunc);
54+
}
55+
4156
default void setCaBytes(byte[] caBytes) {
4257

4358
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.functions.auth;
20+
21+
import io.kubernetes.client.openapi.apis.CoreV1Api;
22+
import io.kubernetes.client.openapi.models.V1Container;
23+
import io.kubernetes.client.openapi.models.V1KeyToPath;
24+
import io.kubernetes.client.openapi.models.V1PodSpec;
25+
import io.kubernetes.client.openapi.models.V1ProjectedVolumeSource;
26+
import io.kubernetes.client.openapi.models.V1SecretVolumeSource;
27+
import io.kubernetes.client.openapi.models.V1ServiceAccountTokenProjection;
28+
import io.kubernetes.client.openapi.models.V1StatefulSet;
29+
import io.kubernetes.client.openapi.models.V1Volume;
30+
import io.kubernetes.client.openapi.models.V1VolumeMount;
31+
import io.kubernetes.client.openapi.models.V1VolumeProjection;
32+
import java.nio.file.Paths;
33+
import java.util.Map;
34+
import java.util.Optional;
35+
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
36+
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
37+
import org.apache.pulsar.functions.instance.AuthenticationConfig;
38+
import org.apache.pulsar.functions.proto.Function;
39+
import org.eclipse.jetty.util.StringUtil;
40+
41+
/**
42+
* Kubernetes Function Authentication Provider that adds Service Account Token Projection to a function pod's container
43+
* definition. This token can be used to authenticate the function instance with the broker and the function worker via
44+
* OpenId Connect when each server is configured to trust the kubernetes issuer. See docs for additional details.
45+
* Relevant settings:
46+
* <p>
47+
* brokerClientTrustCertsSecretName: The Kubernetes secret containing the broker's trust certs. If it is not set,
48+
* the function will not use a custom trust store. The secret must already exist in each function's target
49+
* namespace. The secret must contain a key named `ca.crt` with the trust certs. Only the ca.crt will be mounted.
50+
* </p>
51+
* <p>
52+
* serviceAccountTokenExpirationSeconds: The expiration for the token created by the
53+
* {@link KubernetesServiceAccountTokenAuthProvider}. The default value is 3600 seconds.
54+
* </p>
55+
* <p>
56+
* serviceAccountTokenAudience: The audience for the token created by the
57+
* {@link KubernetesServiceAccountTokenAuthProvider}.
58+
* </p>
59+
* Note: the pod inherits the namespace's default service account.
60+
*/
61+
public class KubernetesServiceAccountTokenAuthProvider implements KubernetesFunctionAuthProvider {
62+
63+
private static final String BROKER_CLIENT_TRUST_CERTS_SECRET_NAME = "brokerClientTrustCertsSecretName";
64+
private static final String SERVICE_ACCOUNT_TOKEN_EXPIRATION_SECONDS = "serviceAccountTokenExpirationSeconds";
65+
private static final String SERVICE_ACCOUNT_TOKEN_AUDIENCE = "serviceAccountTokenAudience";
66+
67+
private static final String SERVICE_ACCOUNT_VOLUME_NAME = "service-account-token";
68+
private static final String TRUST_CERT_VOLUME_NAME = "ca-cert";
69+
private static final String DEFAULT_MOUNT_DIR = "/etc/auth";
70+
private static final String FUNCTION_AUTH_TOKEN = "token";
71+
private static final String FUNCTION_CA_CERT = "ca.crt";
72+
private static final String DEFAULT_CERT_PATH = DEFAULT_MOUNT_DIR + "/" + FUNCTION_CA_CERT;
73+
private String brokerTrustCertsSecretName;
74+
private long serviceAccountTokenExpirationSeconds;
75+
private String serviceAccountTokenAudience;
76+
77+
@Override
78+
public void initialize(CoreV1Api coreClient, byte[] caBytes,
79+
java.util.function.Function<Function.FunctionDetails, String> namespaceCustomizerFunc,
80+
Map<String, Object> config) {
81+
setNamespaceProviderFunc(namespaceCustomizerFunc);
82+
Object certSecretName = config.get(BROKER_CLIENT_TRUST_CERTS_SECRET_NAME);
83+
if (certSecretName instanceof String) {
84+
brokerTrustCertsSecretName = (String) certSecretName;
85+
} else if (certSecretName != null) {
86+
// Throw exception because user set this configuration, but it isn't valid.
87+
throw new IllegalArgumentException("Invalid value for " + BROKER_CLIENT_TRUST_CERTS_SECRET_NAME
88+
+ ". Expected a string.");
89+
}
90+
Object tokenExpirationSeconds = config.get(SERVICE_ACCOUNT_TOKEN_EXPIRATION_SECONDS);
91+
if (tokenExpirationSeconds instanceof Long) {
92+
serviceAccountTokenExpirationSeconds = (Long) tokenExpirationSeconds;
93+
} else if (tokenExpirationSeconds instanceof String) {
94+
try {
95+
serviceAccountTokenExpirationSeconds = Long.parseLong((String) tokenExpirationSeconds);
96+
} catch (NumberFormatException e) {
97+
throw new IllegalArgumentException("Invalid value for " + SERVICE_ACCOUNT_TOKEN_EXPIRATION_SECONDS
98+
+ ". Expected a long.");
99+
}
100+
} else if (tokenExpirationSeconds != null) {
101+
// Throw exception because user set this configuration, but it isn't valid.
102+
throw new IllegalArgumentException("Invalid value for " + SERVICE_ACCOUNT_TOKEN_EXPIRATION_SECONDS
103+
+ ". Expected a long.");
104+
}
105+
Object tokenAudience = config.get(SERVICE_ACCOUNT_TOKEN_AUDIENCE);
106+
if (tokenAudience instanceof String) {
107+
serviceAccountTokenAudience = (String) tokenAudience;
108+
} else if (tokenAudience != null) {
109+
throw new IllegalArgumentException("Invalid value for " + SERVICE_ACCOUNT_TOKEN_AUDIENCE
110+
+ ". Expected a string.");
111+
}
112+
}
113+
114+
@Override
115+
public void configureAuthenticationConfig(AuthenticationConfig authConfig,
116+
Optional<FunctionAuthData> functionAuthData) {
117+
authConfig.setClientAuthenticationPlugin(AuthenticationToken.class.getName());
118+
authConfig.setClientAuthenticationParameters(Paths.get(DEFAULT_MOUNT_DIR, FUNCTION_AUTH_TOKEN)
119+
.toUri().toString());
120+
if (StringUtil.isNotBlank(brokerTrustCertsSecretName)) {
121+
authConfig.setTlsTrustCertsFilePath(DEFAULT_CERT_PATH);
122+
}
123+
}
124+
125+
/**
126+
* No need to cache anything. Kubernetes generates the token used for authentication.
127+
*/
128+
@Override
129+
public Optional<FunctionAuthData> cacheAuthData(Function.FunctionDetails funcDetails,
130+
AuthenticationDataSource authenticationDataSource)
131+
throws Exception {
132+
return Optional.empty();
133+
}
134+
135+
/**
136+
* No need to update anything. Kubernetes updates the token used for authentication.
137+
*/
138+
@Override
139+
public Optional<FunctionAuthData> updateAuthData(Function.FunctionDetails funcDetails,
140+
Optional<FunctionAuthData> existingFunctionAuthData,
141+
AuthenticationDataSource authenticationDataSource)
142+
throws Exception {
143+
return Optional.empty();
144+
}
145+
146+
/**
147+
* No need to clean up anything. Kubernetes cleans up the secret when the pod is deleted.
148+
*/
149+
@Override
150+
public void cleanUpAuthData(Function.FunctionDetails funcDetails, Optional<FunctionAuthData> functionAuthData)
151+
throws Exception {
152+
153+
}
154+
155+
@Override
156+
public void initialize(CoreV1Api coreClient) {
157+
}
158+
159+
@Override
160+
public void configureAuthDataStatefulSet(V1StatefulSet statefulSet, Optional<FunctionAuthData> functionAuthData) {
161+
V1PodSpec podSpec = statefulSet.getSpec().getTemplate().getSpec();
162+
// configure pod mount secret with auth token
163+
if (StringUtil.isNotBlank(brokerTrustCertsSecretName)) {
164+
podSpec.addVolumesItem(createTrustCertVolume());
165+
}
166+
podSpec.addVolumesItem(createServiceAccountVolume());
167+
podSpec.getContainers().forEach(this::addVolumeMountsToContainer);
168+
}
169+
170+
private V1Volume createServiceAccountVolume() {
171+
V1ProjectedVolumeSource projectedVolumeSource = new V1ProjectedVolumeSource();
172+
V1VolumeProjection volumeProjection = new V1VolumeProjection();
173+
volumeProjection.serviceAccountToken(
174+
new V1ServiceAccountTokenProjection()
175+
.audience(serviceAccountTokenAudience)
176+
.expirationSeconds(serviceAccountTokenExpirationSeconds)
177+
.path(FUNCTION_AUTH_TOKEN));
178+
projectedVolumeSource.addSourcesItem(volumeProjection);
179+
return new V1Volume()
180+
.name(SERVICE_ACCOUNT_VOLUME_NAME)
181+
.projected(projectedVolumeSource);
182+
}
183+
184+
private V1Volume createTrustCertVolume() {
185+
return new V1Volume()
186+
.name(TRUST_CERT_VOLUME_NAME)
187+
.secret(new V1SecretVolumeSource()
188+
.secretName(brokerTrustCertsSecretName)
189+
.addItemsItem(new V1KeyToPath()
190+
.key(FUNCTION_CA_CERT)
191+
.path(FUNCTION_CA_CERT)));
192+
}
193+
194+
private void addVolumeMountsToContainer(V1Container container) {
195+
container.addVolumeMountsItem(
196+
new V1VolumeMount()
197+
.name(SERVICE_ACCOUNT_VOLUME_NAME)
198+
.mountPath(DEFAULT_MOUNT_DIR)
199+
.readOnly(true));
200+
if (StringUtil.isNotBlank(brokerTrustCertsSecretName)) {
201+
container.addVolumeMountsItem(
202+
new V1VolumeMount()
203+
.name(TRUST_CERT_VOLUME_NAME)
204+
.mountPath(DEFAULT_MOUNT_DIR)
205+
.readOnly(true));
206+
}
207+
}
208+
}

pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,12 @@ public void initialize(WorkerConfig workerConfig, AuthenticationConfig authentic
248248
throw new IllegalArgumentException("Function authentication provider "
249249
+ functionAuthProvider.get().getClass().getName() + " must implement KubernetesFunctionAuthProvider");
250250
} else {
251-
KubernetesFunctionAuthProvider kubernetesFunctionAuthProvider = (KubernetesFunctionAuthProvider) functionAuthProvider.get();
252-
kubernetesFunctionAuthProvider.initialize(coreClient, serverCaBytes, (funcDetails) -> getRuntimeCustomizer().map((customizer) -> customizer.customizeNamespace(funcDetails, jobNamespace)).orElse(jobNamespace));
251+
KubernetesFunctionAuthProvider kubernetesFunctionAuthProvider =
252+
(KubernetesFunctionAuthProvider) functionAuthProvider.get();
253+
kubernetesFunctionAuthProvider.initialize(coreClient, serverCaBytes,
254+
(funcDetails) -> getRuntimeCustomizer()
255+
.map((customizer) -> customizer.customizeNamespace(funcDetails, jobNamespace))
256+
.orElse(jobNamespace), factoryConfig.getKubernetesFunctionAuthProviderConfig());
253257
this.authProvider = Optional.of(kubernetesFunctionAuthProvider);
254258
}
255259
} else {

pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java

+7
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.pulsar.functions.runtime.kubernetes;
2020

21+
import java.util.HashMap;
22+
import java.util.Map;
2123
import lombok.Data;
2224
import lombok.experimental.Accessors;
2325
import org.apache.pulsar.common.configuration.FieldContext;
@@ -168,4 +170,9 @@ public class KubernetesRuntimeFactoryConfig {
168170
)
169171
protected int gracePeriodSeconds = 5;
170172

173+
@FieldContext(
174+
doc = "A map of custom configurations passed to implementations of the KubernetesFunctionAuthProvider"
175+
+ " interface."
176+
)
177+
private Map<String, Object> kubernetesFunctionAuthProviderConfig = new HashMap<>();
171178
}

0 commit comments

Comments
 (0)