Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support mount secret file, fixed namespace #158

Merged
merged 10 commits into from
May 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions mesh-worker-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<version>v0.1.5-SNAPSHOT</version>

<properties>
<pulsar.version>2.8.0-rc-202103292206</pulsar.version>
<pulsar.version>2.8.0-rc-202105140121</pulsar.version>
<lombok.version>1.18.16</lombok.version>
<log4j2.version>2.14.0</log4j2.version>
<kubernetes-client.version>10.0.1</kubernetes-client.version>
Expand All @@ -42,7 +42,7 @@

<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<groupId>io.streamnative</groupId>
<artifactId>pulsar-functions-worker</artifactId>
<version>${pulsar.version}</version>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.kubernetes.client.util.Config;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
Expand All @@ -38,11 +37,8 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.ErrorNotifier;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerConfig;
Expand Down Expand Up @@ -76,7 +72,6 @@ public class MeshWorkerService implements WorkerService {
private CustomObjectsApi customObjectsApi;
private ApiClient apiClient;
private PulsarAdmin brokerAdmin;
private Optional<KubernetesFunctionAuthProvider> authProvider;
private KubernetesRuntimeFactoryConfig factoryConfig;

private AuthenticationService authenticationService;
Expand Down Expand Up @@ -138,13 +133,6 @@ public void init(WorkerConfig workerConfig) throws Exception {
this.workerConfig = workerConfig;
this.initKubernetesClient();
this.authenticationEnabled = this.workerConfig.isAuthenticationEnabled();
if (this.workerConfig.isAuthenticationEnabled() && !StringUtils.isEmpty(this.workerConfig.getFunctionAuthProviderClassName())) {
Optional<FunctionAuthProvider> functionAuthProvider = Optional.empty();
functionAuthProvider = Optional.of(FunctionAuthProvider.getAuthProvider(workerConfig.getFunctionAuthProviderClassName()));
KubernetesFunctionAuthProvider kubernetesFunctionAuthProvider = (KubernetesFunctionAuthProvider) functionAuthProvider.get();
kubernetesFunctionAuthProvider.initialize(coreV1Api, null, null);
this.authProvider = Optional.of(kubernetesFunctionAuthProvider);
}
this.functions = new FunctionsImpl(() -> MeshWorkerService.this);
this.sources = new SourcesImpl(() -> MeshWorkerService.this);
this.sinks = new SinksImpl(() -> MeshWorkerService.this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import com.google.common.collect.Maps;
import io.functionmesh.compute.functions.models.V1alpha1FunctionSpecPod;
import io.functionmesh.compute.functions.models.V1alpha1FunctionSpecPodVolumeMounts;
import io.functionmesh.compute.functions.models.V1alpha1FunctionSpecPodVolumes;
import io.functionmesh.compute.util.FunctionsUtil;
import io.functionmesh.compute.functions.models.V1alpha1Function;
import io.functionmesh.compute.MeshWorkerService;
Expand All @@ -36,10 +38,10 @@
import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.worker.service.api.Functions;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;

import javax.ws.rs.core.Response;
import java.io.InputStream;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

Expand Down Expand Up @@ -102,6 +104,8 @@ public void registerFunction(final String tenant,
functionPkgUrl,
functionConfig
);
// override namespace by configuration file
v1alpha1Function.getMetadata().setNamespace(KubernetesUtils.getNamespace(worker().getFactoryConfig()));
Map<String, String> customLabels = Maps.newHashMap();
customLabels.put(TENANT_LABEL_CLAIM, tenant);
customLabels.put(NAMESPACE_LABEL_CLAIM, namespace);
Expand All @@ -112,35 +116,7 @@ public void registerFunction(final String tenant,
pod.setLabels(customLabels);
v1alpha1Function.getSpec().setPod(pod);
try {
if (worker().getWorkerConfig().isAuthenticationEnabled()) {
Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
functionDetailsBuilder.setTenant(tenant);
functionDetailsBuilder.setNamespace(namespace);
functionDetailsBuilder.setName(functionName);
worker().getAuthProvider().ifPresent(functionAuthProvider -> {
if (clientAuthenticationDataHttps != null) {
try {
String authSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "auth",
v1alpha1Function.getSpec().getClusterName(), tenant, namespace, functionName,
worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
v1alpha1Function.getSpec().getPulsar().setAuthSecret(authSecretName);
String tlsSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "tls",
v1alpha1Function.getSpec().getClusterName(), tenant, namespace, functionName,
worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
v1alpha1Function.getSpec().getPulsar().setTlsSecret(tlsSecretName);
} catch (Exception e) {
log.error("Error caching authentication data for {} {}/{}/{}",
ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);


throw new RestException(
Response.Status.INTERNAL_SERVER_ERROR,
String.format("Error caching authentication data for %s %s:- %s",
ComponentTypeUtils.toString(componentType), functionName, e.getMessage()));
}
}
});
}
this.upsertFunction(tenant, namespace, functionName, functionConfig, v1alpha1Function, clientAuthenticationDataHttps);
Call call = worker().getCustomObjectsApi().createNamespacedCustomObjectCall(
group,
version,
Expand Down Expand Up @@ -191,6 +167,7 @@ public void updateFunction(final String tenant,
functionConfig
);
v1alpha1Function.getMetadata().setResourceVersion(oldFn.getMetadata().getResourceVersion());
this.upsertFunction(tenant, namespace, functionName, functionConfig, v1alpha1Function, clientAuthenticationDataHttps);
Call replaceCall = worker().getCustomObjectsApi().replaceNamespacedCustomObjectCall(
group,
version,
Expand Down Expand Up @@ -304,7 +281,51 @@ public void updateFunctionOnWorkerLeader(final String tenant,
final InputStream uploadedInputStream,
final boolean delete,
URI uri,
final String clientRole) {
final String clientRole,
final AuthenticationDataSource clientAuthenticationDataHttps) {

}

private void upsertFunction(final String tenant,
final String namespace,
final String functionName,
final FunctionConfig functionConfig,
V1alpha1Function v1alpha1Function,
AuthenticationDataHttps clientAuthenticationDataHttps) {
if (worker().getWorkerConfig().isAuthenticationEnabled()) {
if (clientAuthenticationDataHttps != null) {
try {

Map<String, Object> functionsWorkerServiceCustomConfigs = worker()
.getWorkerConfig().getFunctionsWorkerServiceCustomConfigs();
Object volumes = functionsWorkerServiceCustomConfigs.get("volumes");
if (volumes != null) {
List<V1alpha1FunctionSpecPodVolumes> volumesList = (List<V1alpha1FunctionSpecPodVolumes>) volumes;
v1alpha1Function.getSpec().getPod().setVolumes(volumesList);
}
Object volumeMounts = functionsWorkerServiceCustomConfigs.get("volumeMounts");
if (volumeMounts != null) {
List<V1alpha1FunctionSpecPodVolumeMounts> volumeMountsList = (List<V1alpha1FunctionSpecPodVolumeMounts>) volumeMounts;
v1alpha1Function.getSpec().setVolumeMounts(volumeMountsList);
}
String authSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "auth",
v1alpha1Function.getSpec().getClusterName(), tenant, namespace, functionName,
worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
v1alpha1Function.getSpec().getPulsar().setAuthSecret(authSecretName);
String tlsSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "tls",
v1alpha1Function.getSpec().getClusterName(), tenant, namespace, functionName,
worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
v1alpha1Function.getSpec().getPulsar().setTlsSecret(tlsSecretName);
} catch (Exception e) {
log.error("Error create or update auth or tls secret for {} {}/{}/{}",
ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);


throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
String.format("Error create or update auth or tls secret for %s %s:- %s",
ComponentTypeUtils.toString(componentType), functionName, e.getMessage()));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ public void putFunctionState(final String tenant,
@Override
public void uploadFunction(final InputStream uploadedInputStream,
final String path,
String clientRole) {
String clientRole,
final AuthenticationDataSource clientAuthenticationDataHttps) {

}

Expand Down Expand Up @@ -376,7 +377,7 @@ public List<ConnectorDefinition> getListOfConnectors() {
}

@Override
public void reloadConnectors(String clientRole) {
public void reloadConnectors(String clientRole, final AuthenticationDataSource clientAuthenticationDataHttps) {
meshWorkerServiceSupplier.get().getConnectorsManager().reloadConnectors();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.google.common.collect.Maps;
import io.functionmesh.compute.sinks.models.V1alpha1Sink;
import io.functionmesh.compute.sinks.models.V1alpha1SinkSpecPod;
import io.functionmesh.compute.sinks.models.V1alpha1SinkSpecPodVolumeMounts;
import io.functionmesh.compute.sinks.models.V1alpha1SinkSpecPodVolumes;
import io.functionmesh.compute.util.KubernetesUtils;
import io.functionmesh.compute.util.SinksUtil;
import io.functionmesh.compute.MeshWorkerService;
Expand All @@ -40,7 +42,6 @@
import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.worker.service.api.Sinks;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;

import javax.ws.rs.core.Response;
import java.io.InputStream;
import java.net.URI;
Expand Down Expand Up @@ -101,8 +102,7 @@ public void registerSink(
clientAuthenticationDataHttps,
ComponentTypeUtils.toString(componentType));
this.validateTenantIsExist(tenant, namespace, sinkName, clientRole);
V1alpha1Sink v1alpha1Sink;
v1alpha1Sink =
V1alpha1Sink v1alpha1Sink =
SinksUtil.createV1alpha1SkinFromSinkConfig(
kind,
group,
Expand All @@ -112,6 +112,7 @@ public void registerSink(
uploadedInputStream,
sinkConfig,
this.meshWorkerServiceSupplier.get().getConnectorsManager());
// override namesapce by configuration
v1alpha1Sink.getMetadata().setNamespace(KubernetesUtils.getNamespace(worker().getFactoryConfig()));
try {
Map<String, String> customLabels = Maps.newHashMap();
Expand All @@ -125,33 +126,7 @@ public void registerSink(
}
pod.setLabels(customLabels);
v1alpha1Sink.getSpec().setPod(pod);
if (worker().getWorkerConfig().isAuthenticationEnabled()) {
Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
functionDetailsBuilder.setTenant(tenant);
functionDetailsBuilder.setNamespace(namespace);
functionDetailsBuilder.setName(sinkName);
worker().getAuthProvider().ifPresent(functionAuthProvider -> {
if (clientAuthenticationDataHttps != null) {
try {
String authSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "auth",
v1alpha1Sink.getSpec().getClusterName(), tenant, namespace, sinkName,
worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
v1alpha1Sink.getSpec().getPulsar().setAuthSecret(authSecretName);
String tlsSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "tls",
v1alpha1Sink.getSpec().getClusterName(), tenant, namespace, sinkName,
worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
v1alpha1Sink.getSpec().getPulsar().setTlsSecret(tlsSecretName);
} catch (Exception e) {
log.error("Error caching authentication data for {} {}/{}/{}",
ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);


throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s",
ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
}
}
});
}
this.upsertSink(tenant, namespace, sinkName, sinkConfig, v1alpha1Sink, clientAuthenticationDataHttps);
Call call =
worker().getCustomObjectsApi()
.createNamespacedCustomObjectCall(
Expand Down Expand Up @@ -211,6 +186,7 @@ public void updateSink(
sinkPkgUrl,
uploadedInputStream,
sinkConfig, this.meshWorkerServiceSupplier.get().getConnectorsManager());
this.upsertSink(tenant, namespace, sinkName, sinkConfig, v1alpha1Sink, clientAuthenticationDataHttps);
v1alpha1Sink.getMetadata().setNamespace(KubernetesUtils.getNamespace(worker().getFactoryConfig()));
v1alpha1Sink
.getMetadata()
Expand Down Expand Up @@ -355,4 +331,46 @@ public List<ConnectorDefinition> getSinkList() {
public List<ConfigFieldDefinition> getSinkConfigDefinition(String name) {
return new ArrayList<>();
}

private void upsertSink(final String tenant,
final String namespace,
final String sinkName,
final SinkConfig sinkConfig,
V1alpha1Sink v1alpha1Sink,
AuthenticationDataHttps clientAuthenticationDataHttps) {
if (worker().getWorkerConfig().isAuthenticationEnabled()) {
if (clientAuthenticationDataHttps != null) {
try {
Map<String, Object> functionsWorkerServiceCustomConfigs = worker()
.getWorkerConfig().getFunctionsWorkerServiceCustomConfigs();
Object volumes = functionsWorkerServiceCustomConfigs.get("volumes");
if (volumes != null) {
List<V1alpha1SinkSpecPodVolumes> volumesList = (List<V1alpha1SinkSpecPodVolumes>) volumes;
v1alpha1Sink.getSpec().getPod().setVolumes(volumesList);
}
Object volumeMounts = functionsWorkerServiceCustomConfigs.get("volumeMounts");
if (volumeMounts != null) {
List<V1alpha1SinkSpecPodVolumeMounts> volumeMountsList = (List<V1alpha1SinkSpecPodVolumeMounts>) volumeMounts;
v1alpha1Sink.getSpec().setVolumeMounts(volumeMountsList);
}
String authSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "auth",
v1alpha1Sink.getSpec().getClusterName(), tenant, namespace, sinkName,
worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
v1alpha1Sink.getSpec().getPulsar().setAuthSecret(authSecretName);
String tlsSecretName = KubernetesUtils.upsertSecret(kind.toLowerCase(), "tls",
v1alpha1Sink.getSpec().getClusterName(), tenant, namespace, sinkName,
worker().getWorkerConfig(), worker().getCoreV1Api(), worker().getFactoryConfig());
v1alpha1Sink.getSpec().getPulsar().setTlsSecret(tlsSecretName);
} catch (Exception e) {
log.error("Error create or update auth or tls secret data for {} {}/{}/{}",
ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);


throw new RestException(Response.Status.INTERNAL_SERVER_ERROR,
String.format("Error create or update auth or tls secret for %s %s:- %s",
ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
}
}
}
}
}
Loading