From e2d33c7367617f2010bf984f50ad500ad805b017 Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Thu, 21 Jul 2022 02:22:27 +0000 Subject: [PATCH] use init container --- api/v1alpha1/common.go | 19 + api/v1alpha1/function_types.go | 3 +- api/v1alpha1/sink_types.go | 3 +- api/v1alpha1/source_types.go | 3 +- api/v1alpha1/zz_generated.deepcopy.go | 20 + ...ompute.functionmesh.io-functionmeshes.yaml | 63 ++++ ...crd-compute.functionmesh.io-functions.yaml | 21 ++ .../crd-compute.functionmesh.io-sinks.yaml | 21 ++ .../crd-compute.functionmesh.io-sources.yaml | 21 ++ ...ompute.functionmesh.io_functionmeshes.yaml | 63 ++++ .../compute.functionmesh.io_functions.yaml | 21 ++ .../bases/compute.functionmesh.io_sinks.yaml | 21 ++ .../compute.functionmesh.io_sources.yaml | 21 ++ controllers/spec/common.go | 241 +++++++++--- controllers/spec/common_test.go | 357 ++++++++++++++++-- controllers/spec/function.go | 22 +- controllers/spec/sink.go | 17 +- controllers/spec/source.go | 17 +- controllers/spec/utils.go | 4 + manifests/crd.yaml | 126 +++++++ 20 files changed, 986 insertions(+), 98 deletions(-) diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index c23e4368d..bbdb3c89d 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -60,6 +60,8 @@ type PulsarMessaging struct { // To replace the TLSSecret TLSConfig *PulsarTLSConfig `json:"tlsConfig,omitempty"` + + Oauth2Config *Oauth2Config `json:"oauth2Config,omitempty"` } type TLSConfig struct { @@ -102,6 +104,23 @@ func (c *PulsarTLSConfig) GetMountPath() string { return "/etc/tls/pulsar-functions" } +type Oauth2Config struct { + Audience string `json:"audience"` + ClientId string `json:"clientId"` + IssuerUrl string `json:"issuerUrl"` + // the secret name of the Oauth2 key file + KeySecretName string `json:"keySecretName"` + KeySecretKey string `json:"keySecretKey"` +} + +func (o *Oauth2Config) GetMountPath() string { + return "/etc/oauth2" +} + +func (o *Oauth2Config) GetMountFile() string { + return fmt.Sprintf("%s/%s", o.GetMountPath(), o.KeySecretKey) +} + type PulsarStateStore struct { // The service url points to the state store service // By default, the state store service is bookkeeper table service diff --git a/api/v1alpha1/function_types.go b/api/v1alpha1/function_types.go index f20c3b07e..d08e9a7c3 100644 --- a/api/v1alpha1/function_types.go +++ b/api/v1alpha1/function_types.go @@ -37,7 +37,8 @@ type FunctionSpec struct { ClusterName string `json:"clusterName,omitempty"` // +kubebuilder:validation:Required // +kubebuilder:validation:Minimum=1 - Replicas *int32 `json:"replicas"` + Replicas *int32 `json:"replicas"` + DownloaderImage string `json:"downloaderImage,omitempty"` // MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler // If provided, a default HPA with CPU at average of 80% will be used. diff --git a/api/v1alpha1/sink_types.go b/api/v1alpha1/sink_types.go index ab05a190c..b404b5e53 100644 --- a/api/v1alpha1/sink_types.go +++ b/api/v1alpha1/sink_types.go @@ -38,7 +38,8 @@ type SinkSpec struct { SinkType string `json:"sinkType,omitempty"` // refer to `--sink-type` as builtin connector // +kubebuilder:validation:Required // +kubebuilder:validation:Minimum=1 - Replicas *int32 `json:"replicas"` + Replicas *int32 `json:"replicas"` + DownloaderImage string `json:"downloaderImage,omitempty"` // MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler // If provided, a default HPA with CPU at average of 80% will be used. diff --git a/api/v1alpha1/source_types.go b/api/v1alpha1/source_types.go index 13d3a78f0..f896d7fb0 100644 --- a/api/v1alpha1/source_types.go +++ b/api/v1alpha1/source_types.go @@ -37,7 +37,8 @@ type SourceSpec struct { ClusterName string `json:"clusterName,omitempty"` SourceType string `json:"sourceType,omitempty"` // refer to `--source-type` as builtin connector // +kubebuilder:validation:Required - Replicas *int32 `json:"replicas"` + Replicas *int32 `json:"replicas"` + DownloaderImage string `json:"downloaderImage,omitempty"` // MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler // If provided, a default HPA with CPU at average of 80% will be used. diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 338ae9041..f1d100a07 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -495,6 +495,21 @@ func (in *Messaging) DeepCopy() *Messaging { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Oauth2Config) DeepCopyInto(out *Oauth2Config) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Oauth2Config. +func (in *Oauth2Config) DeepCopy() *Oauth2Config { + if in == nil { + return nil + } + out := new(Oauth2Config) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OutputConf) DeepCopyInto(out *OutputConf) { *out = *in @@ -653,6 +668,11 @@ func (in *PulsarMessaging) DeepCopyInto(out *PulsarMessaging) { *out = new(PulsarTLSConfig) **out = **in } + if in.Oauth2Config != nil { + in, out := &in.Oauth2Config, &out.Oauth2Config + *out = new(Oauth2Config) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PulsarMessaging. diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml index dfc8c3749..0e44ed113 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml @@ -39,6 +39,8 @@ spec: type: string deadLetterTopic: type: string + downloaderImage: + type: string forwardSourceMessageProperty: type: boolean funcConfig: @@ -2592,6 +2594,25 @@ spec: properties: authSecret: type: string + oauth2Config: + properties: + audience: + type: string + clientId: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + required: + - audience + - clientId + - issuerUrl + - keySecretKey + - keySecretName + type: object pulsarConfig: type: string tlsConfig: @@ -2725,6 +2746,8 @@ spec: type: string deadLetterTopic: type: string + downloaderImage: + type: string golang: properties: go: @@ -5212,6 +5235,25 @@ spec: properties: authSecret: type: string + oauth2Config: + properties: + audience: + type: string + clientId: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + required: + - audience + - clientId + - issuerUrl + - keySecretKey + - keySecretName + type: object pulsarConfig: type: string tlsConfig: @@ -5323,6 +5365,8 @@ spec: type: string clusterName: type: string + downloaderImage: + type: string forwardSourceMessageProperty: type: boolean golang: @@ -7793,6 +7837,25 @@ spec: properties: authSecret: type: string + oauth2Config: + properties: + audience: + type: string + clientId: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + required: + - audience + - clientId + - issuerUrl + - keySecretKey + - keySecretName + type: object pulsarConfig: type: string tlsConfig: diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml index 17e2c4b13..fd690e2bc 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml @@ -58,6 +58,8 @@ spec: type: string deadLetterTopic: type: string + downloaderImage: + type: string forwardSourceMessageProperty: type: boolean funcConfig: @@ -2611,6 +2613,25 @@ spec: properties: authSecret: type: string + oauth2Config: + properties: + audience: + type: string + clientId: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + required: + - audience + - clientId + - issuerUrl + - keySecretKey + - keySecretName + type: object pulsarConfig: type: string tlsConfig: diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml index ab9f9139e..1c256ed08 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml @@ -58,6 +58,8 @@ spec: type: string deadLetterTopic: type: string + downloaderImage: + type: string golang: properties: go: @@ -2545,6 +2547,25 @@ spec: properties: authSecret: type: string + oauth2Config: + properties: + audience: + type: string + clientId: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + required: + - audience + - clientId + - issuerUrl + - keySecretKey + - keySecretName + type: object pulsarConfig: type: string tlsConfig: diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml index 819db0546..79d1623c4 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml @@ -52,6 +52,8 @@ spec: type: string clusterName: type: string + downloaderImage: + type: string forwardSourceMessageProperty: type: boolean golang: @@ -2522,6 +2524,25 @@ spec: properties: authSecret: type: string + oauth2Config: + properties: + audience: + type: string + clientId: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + required: + - audience + - clientId + - issuerUrl + - keySecretKey + - keySecretName + type: object pulsarConfig: type: string tlsConfig: diff --git a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml index d12ff2258..8d4583f80 100644 --- a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml +++ b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml @@ -41,6 +41,8 @@ spec: type: string deadLetterTopic: type: string + downloaderImage: + type: string forwardSourceMessageProperty: type: boolean funcConfig: @@ -2594,6 +2596,25 @@ spec: properties: authSecret: type: string + oauth2Config: + properties: + audience: + type: string + clientId: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + required: + - audience + - clientId + - issuerUrl + - keySecretKey + - keySecretName + type: object pulsarConfig: type: string tlsConfig: @@ -2727,6 +2748,8 @@ spec: type: string deadLetterTopic: type: string + downloaderImage: + type: string golang: properties: go: @@ -5214,6 +5237,25 @@ spec: properties: authSecret: type: string + oauth2Config: + properties: + audience: + type: string + clientId: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + required: + - audience + - clientId + - issuerUrl + - keySecretKey + - keySecretName + type: object pulsarConfig: type: string tlsConfig: @@ -5325,6 +5367,8 @@ spec: type: string clusterName: type: string + downloaderImage: + type: string forwardSourceMessageProperty: type: boolean golang: @@ -7795,6 +7839,25 @@ spec: properties: authSecret: type: string + oauth2Config: + properties: + audience: + type: string + clientId: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + required: + - audience + - clientId + - issuerUrl + - keySecretKey + - keySecretName + type: object pulsarConfig: type: string tlsConfig: diff --git a/config/crd/bases/compute.functionmesh.io_functions.yaml b/config/crd/bases/compute.functionmesh.io_functions.yaml index 9f6fda17c..66807b75e 100644 --- a/config/crd/bases/compute.functionmesh.io_functions.yaml +++ b/config/crd/bases/compute.functionmesh.io_functions.yaml @@ -38,6 +38,8 @@ spec: type: string deadLetterTopic: type: string + downloaderImage: + type: string forwardSourceMessageProperty: type: boolean funcConfig: @@ -2591,6 +2593,25 @@ spec: properties: authSecret: type: string + oauth2Config: + properties: + audience: + type: string + clientId: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + required: + - audience + - clientId + - issuerUrl + - keySecretKey + - keySecretName + type: object pulsarConfig: type: string tlsConfig: diff --git a/config/crd/bases/compute.functionmesh.io_sinks.yaml b/config/crd/bases/compute.functionmesh.io_sinks.yaml index 4ebf49bfe..45cae4724 100644 --- a/config/crd/bases/compute.functionmesh.io_sinks.yaml +++ b/config/crd/bases/compute.functionmesh.io_sinks.yaml @@ -38,6 +38,8 @@ spec: type: string deadLetterTopic: type: string + downloaderImage: + type: string golang: properties: go: @@ -2525,6 +2527,25 @@ spec: properties: authSecret: type: string + oauth2Config: + properties: + audience: + type: string + clientId: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + required: + - audience + - clientId + - issuerUrl + - keySecretKey + - keySecretName + type: object pulsarConfig: type: string tlsConfig: diff --git a/config/crd/bases/compute.functionmesh.io_sources.yaml b/config/crd/bases/compute.functionmesh.io_sources.yaml index 3fc9e19d9..513966552 100644 --- a/config/crd/bases/compute.functionmesh.io_sources.yaml +++ b/config/crd/bases/compute.functionmesh.io_sources.yaml @@ -32,6 +32,8 @@ spec: type: string clusterName: type: string + downloaderImage: + type: string forwardSourceMessageProperty: type: boolean golang: @@ -2502,6 +2504,25 @@ spec: properties: authSecret: type: string + oauth2Config: + properties: + audience: + type: string + clientId: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + required: + - audience + - clientId + - issuerUrl + - keySecretKey + - keySecretName + type: object pulsarConfig: type: string tlsConfig: diff --git a/controllers/spec/common.go b/controllers/spec/common.go index 93bf07d17..d3986e12c 100644 --- a/controllers/spec/common.go +++ b/controllers/spec/common.go @@ -42,7 +42,7 @@ const ( DefaultJavaRunnerImage = DefaultRunnerPrefix + "pulsar-functions-java-runner:" + DefaultRunnerTag DefaultPythonRunnerImage = DefaultRunnerPrefix + "pulsar-functions-python-runner:" + DefaultRunnerTag DefaultGoRunnerImage = DefaultRunnerPrefix + "pulsar-functions-go-runner:" + DefaultRunnerTag - PulsarAdminExecutableFile = "/pulsar/bin/pulsar-admin" + WorkDir = "/pulsar/" DefaultForAllowInsecure = "false" DefaultForEnableHostNameVerification = "true" @@ -64,6 +64,12 @@ const ( DefaultRunnerUserID int64 = 10000 DefaultRunnerGroupID int64 = 10001 + + PulsarctlExecutableFile = "/usr/local/bin/pulsarctl" + DownloaderName = "downloader" + DownloaderVolume = "downloader-volume" + DownloaderImage = "streamnative/pulsarctl:2.9.2.24" + DownloadDir = "/pulsar/download" ) var GRPCPort = corev1.ContainerPort{ @@ -114,28 +120,69 @@ func MakeHeadlessServiceName(serviceName string) string { return fmt.Sprintf("%s-headless", serviceName) } -func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, container *corev1.Container, - volumes []corev1.Volume, labels map[string]string, policy v1alpha1.PodPolicy) *appsv1.StatefulSet { +func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderImage string, container *corev1.Container, + volumes []corev1.Volume, labels map[string]string, policy v1alpha1.PodPolicy, pulsar v1alpha1.PulsarMessaging, + javaRuntime *v1alpha1.JavaRuntime, pythonRuntime *v1alpha1.PythonRuntime, goRuntime *v1alpha1.GoRuntime) *appsv1.StatefulSet { + + volumeMounts := generateDownloaderVolumeMountsForDownloader(pulsar.Oauth2Config, javaRuntime, pythonRuntime, goRuntime) + var downloaderContainer *corev1.Container = nil + var podVolumes = volumes + // there must be a download path specified, we need to create an init container and emptyDir volume + if volumeMounts != nil { + var downloadPath, componentPackage string + if javaRuntime != nil { + downloadPath = javaRuntime.JarLocation + componentPackage = javaRuntime.Jar + } else if pythonRuntime != nil { + downloadPath = pythonRuntime.PyLocation + componentPackage = pythonRuntime.Py + } else { + downloadPath = goRuntime.GoLocation + componentPackage = goRuntime.Go + } + + image := downloaderImage + if image == "" { + image = DownloaderImage + } + + componentPackage = fmt.Sprintf("%s/%s", DownloadDir, getFilenameOfComponentPackage(componentPackage)) + downloaderContainer = &corev1.Container{ + Name: DownloaderName, + Image: image, + Command: []string{"sh", "-c", strings.Join(getDownloadCommand(downloadPath, componentPackage, pulsar.TLSSecret != "", pulsar.AuthSecret != "", pulsar.TLSConfig, pulsar.Oauth2Config), " ")}, + VolumeMounts: volumeMounts, + ImagePullPolicy: corev1.PullIfNotPresent, + Env: []corev1.EnvVar{{ + Name: "HOME", + Value: "/tmp", + }}, + EnvFrom: generateContainerEnvFrom(pulsar.PulsarConfig, pulsar.AuthSecret, pulsar.TLSSecret), + } + podVolumes = append(podVolumes, corev1.Volume{ + Name: DownloaderVolume, + }) + } return &appsv1.StatefulSet{ TypeMeta: metav1.TypeMeta{ Kind: "StatefulSet", APIVersion: "apps/v1", }, ObjectMeta: *objectMeta, - Spec: *MakeStatefulSetSpec(replicas, container, volumes, labels, policy, - MakeHeadlessServiceName(objectMeta.Name)), + Spec: *MakeStatefulSetSpec(replicas, container, podVolumes, labels, policy, + MakeHeadlessServiceName(objectMeta.Name), downloaderContainer), } } func MakeStatefulSetSpec(replicas *int32, container *corev1.Container, volumes []corev1.Volume, labels map[string]string, policy v1alpha1.PodPolicy, - serviceName string) *appsv1.StatefulSetSpec { + serviceName string, downloaderContainer *corev1.Container) *appsv1.StatefulSetSpec { return &appsv1.StatefulSetSpec{ Replicas: replicas, Selector: &metav1.LabelSelector{ MatchLabels: labels, }, - Template: *MakePodTemplate(container, volumes, labels, policy), + Template: *MakePodTemplate(container, volumes, labels, policy, downloaderContainer), PodManagementPolicy: appsv1.ParallelPodManagement, UpdateStrategy: appsv1.StatefulSetUpdateStrategy{ Type: appsv1.RollingUpdateStatefulSetStrategyType, @@ -145,18 +192,22 @@ func MakeStatefulSetSpec(replicas *int32, container *corev1.Container, } func MakePodTemplate(container *corev1.Container, volumes []corev1.Volume, - labels map[string]string, policy v1alpha1.PodPolicy) *corev1.PodTemplateSpec { + labels map[string]string, policy v1alpha1.PodPolicy, downloaderContainer *corev1.Container) *corev1.PodTemplateSpec { podSecurityContext := getDefaultRunnerPodSecurityContext(DefaultRunnerUserID, DefaultRunnerGroupID, false) if policy.SecurityContext != nil { podSecurityContext = policy.SecurityContext } + initContainers := policy.InitContainers + if downloaderContainer != nil { + initContainers = append(initContainers, *downloaderContainer) + } return &corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: mergeLabels(labels, Configs.ResourceLabels, policy.Labels), Annotations: generateAnnotations(Configs.ResourceAnnotations, policy.Annotations), }, Spec: corev1.PodSpec{ - InitContainers: policy.InitContainers, + InitContainers: initContainers, Containers: append(policy.Sidecars, *container), TerminationGracePeriodSeconds: &policy.TerminationGracePeriodSeconds, Volumes: volumes, @@ -170,69 +221,76 @@ func MakePodTemplate(container *corev1.Container, volumes []corev1.Volume, } } -func MakeJavaFunctionCommand(downloadPath, packageFile, name, clusterName, details, memory, extraDependenciesDir, uid string, +func MakeJavaFunctionCommand(packageFile, name, clusterName, details, memory, extraDependenciesDir, uid string, authProvided, tlsProvided bool, secretMaps map[string]v1alpha1.SecretRef, state *v1alpha1.Stateful, tlsConfig TLSConfig) []string { processCommand := setShardIDEnvironmentVariableCommand() + " && " + strings.Join(getProcessJavaRuntimeArgs(name, packageFile, clusterName, details, memory, extraDependenciesDir, uid, authProvided, tlsProvided, secretMaps, state, tlsConfig), " ") - if downloadPath != "" { - // prepend download command if the downPath is provided - downloadCommand := strings.Join(getDownloadCommand(downloadPath, packageFile, authProvided, tlsProvided, tlsConfig), " ") - processCommand = downloadCommand + " && " + processCommand - } return []string{"sh", "-c", processCommand} } -func MakePythonFunctionCommand(downloadPath, packageFile, name, clusterName, details, uid string, +func MakePythonFunctionCommand(packageFile, name, clusterName, details, uid string, authProvided, tlsProvided bool, secretMaps map[string]v1alpha1.SecretRef, state *v1alpha1.Stateful, tlsConfig TLSConfig) []string { processCommand := setShardIDEnvironmentVariableCommand() + " && " + strings.Join(getProcessPythonRuntimeArgs(name, packageFile, clusterName, details, uid, authProvided, tlsProvided, secretMaps, state, tlsConfig), " ") - if downloadPath != "" { - // prepend download command if the downPath is provided - downloadCommand := strings.Join(getDownloadCommand(downloadPath, packageFile, authProvided, tlsProvided, tlsConfig), " ") - processCommand = downloadCommand + " && " + processCommand - } return []string{"sh", "-c", processCommand} } -func MakeGoFunctionCommand(downloadPath, goExecFilePath string, function *v1alpha1.Function) []string { +func MakeGoFunctionCommand(goExecFilePath string, function *v1alpha1.Function) []string { processCommand := setShardIDEnvironmentVariableCommand() + " && " + strings.Join(getProcessGoRuntimeArgs(goExecFilePath, function), " ") - if downloadPath != "" { - // prepend download command if the downPath is provided - downloadCommand := strings.Join(getDownloadCommand(downloadPath, goExecFilePath, - function.Spec.Pulsar.AuthSecret != "", function.Spec.Pulsar.TLSSecret != "", function.Spec.Pulsar.TLSConfig), " ") - processCommand = downloadCommand + " && ls -al && pwd &&" + processCommand - } return []string{"sh", "-c", processCommand} } -func getDownloadCommand(downloadPath, componentPackage string, authProvided, tlsProvided bool, tlsConfig TLSConfig) []string { - // The download path is the path that the package saved in the pulsar. - // By default, it's the path that the package saved in the pulsar, we can use package name - // to replace it for downloading packages from packages management service. - args := []string{ - PulsarAdminExecutableFile, - "--admin-url", - "$webServiceURL", - } - if authProvided { - args = append(args, []string{ - "--auth-plugin", - "$clientAuthenticationPlugin", - "--auth-params", - "$clientAuthenticationParameters"}...) +func getDownloadCommand(downloadPath, componentPackage string, tlsProvided, authProvided bool, tlsConfig TLSConfig, oauth2Config *v1alpha1.Oauth2Config) []string { + var args []string + // activate oauth2 for pulsarctl + if oauth2Config != nil { + args = []string{ + PulsarctlExecutableFile, + "context", + "set", + "downloader", + "--admin-service-url", + "$webServiceURL", + "--issuer-endpoint", + oauth2Config.IssuerUrl, + "--client-id", + oauth2Config.ClientId, + "--audience", + oauth2Config.Audience, + "--key-file", + oauth2Config.GetMountFile(), + "&& " + PulsarctlExecutableFile, + "oauth2", + "activate", + "&& " + PulsarctlExecutableFile, + } + } else { + args = []string{ + PulsarctlExecutableFile, + "--admin-service-url", + "$webServiceURL", + } + + if authProvided { + args = append(args, []string{ + "--auth-plugin", + "$clientAuthenticationPlugin", + "--auth-params", + "$clientAuthenticationParameters", + }...) + } + } // Use traditional way if reflect.ValueOf(tlsConfig).IsNil() { if tlsProvided { args = append(args, []string{ - "--tls-allow-insecure", - "${tlsAllowInsecureConnection:-" + DefaultForAllowInsecure + "}", - "--tls-enable-hostname-verification", - "${tlsHostnameVerificationEnable:-" + DefaultForEnableHostNameVerification + "}", + "--tls-allow-insecure=${tlsAllowInsecureConnection:-" + DefaultForAllowInsecure + "}", + "--tls-enable-hostname-verification=${tlsHostnameVerificationEnable:-" + DefaultForEnableHostNameVerification + "}", "--tls-trust-cert-path", "$tlsTrustCertsFilePath", }...) @@ -255,6 +313,7 @@ func getDownloadCommand(downloadPath, componentPackage string, authProvided, tls } } } + if hasPackageNamePrefix(downloadPath) { args = append(args, []string{ "packages", @@ -634,6 +693,17 @@ func generateVolumeFromTLSConfig(tlsConfig TLSConfig) corev1.Volume { } } +func generateVolumeFromOauth2Config(config *v1alpha1.Oauth2Config) corev1.Volume { + return corev1.Volume{ + Name: generateVolumeNameFromOauth2Config(*config), + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: config.KeySecretName, + }, + }, + } +} + func generateVolumeMountFromCryptoSecret(secret *v1alpha1.CryptoSecret) corev1.VolumeMount { return corev1.VolumeMount{ Name: generateVolumeNameFromCryptoSecrets(secret), @@ -648,6 +718,13 @@ func generateVolumeMountFromTLSConfig(tlsConfig TLSConfig) corev1.VolumeMount { } } +func generateVolumeMountFromOauthConfig(oauth2Config v1alpha1.Oauth2Config) corev1.VolumeMount { + return corev1.VolumeMount{ + Name: generateVolumeNameFromOauth2Config(oauth2Config), + MountPath: oauth2Config.GetMountPath(), + } +} + func generateContainerVolumeMountsFromConsumerConfigs(confs map[string]v1alpha1.ConsumerConfig) []corev1.VolumeMount { mounts := []corev1.VolumeMount{} if len(confs) > 0 { @@ -664,6 +741,52 @@ func generateContainerVolumeMountsFromConsumerConfigs(confs map[string]v1alpha1. return mounts } +func generateDownloaderVolumeMountsForDownloader(oauth2Config *v1alpha1.Oauth2Config, javaRuntime *v1alpha1.JavaRuntime, pythonRuntime *v1alpha1.PythonRuntime, + goRuntime *v1alpha1.GoRuntime) []corev1.VolumeMount { + vms := []corev1.VolumeMount{} + if oauth2Config != nil { + vms = append(vms, generateVolumeMountFromOauthConfig(*oauth2Config)) + } + + if (javaRuntime != nil && javaRuntime.JarLocation != "") || + (pythonRuntime != nil && pythonRuntime.PyLocation != "") || + (goRuntime != nil && goRuntime.GoLocation != "") { + vms = append(vms, corev1.VolumeMount{ + Name: DownloaderVolume, + MountPath: DownloadDir, + }) + } + return vms +} + +func generateDownloaderVolumeMountsForRuntime(javaRuntime *v1alpha1.JavaRuntime, pythonRuntime *v1alpha1.PythonRuntime, + goRuntime *v1alpha1.GoRuntime) []corev1.VolumeMount { + downloadPath := "" + if javaRuntime != nil && javaRuntime.JarLocation != "" { + downloadPath = javaRuntime.Jar + } else if pythonRuntime != nil && pythonRuntime.PyLocation != "" { + downloadPath = pythonRuntime.Py + } else if goRuntime != nil && goRuntime.GoLocation != "" { + downloadPath = goRuntime.Go + } + + if downloadPath != "" { + subPath := getFilenameOfComponentPackage(downloadPath) + mountPath := downloadPath + // for relative path, volume should be mounted to the WorkDir + if !strings.HasPrefix(downloadPath, "/") { + mountPath = WorkDir + downloadPath + } + return []corev1.VolumeMount{{ + Name: DownloaderVolume, + MountPath: mountPath, + SubPath: subPath, + ReadOnly: true, + }} + } + return nil +} + func generateContainerVolumeMountsFromProducerConf(conf *v1alpha1.ProducerConfig) []corev1.VolumeMount { mounts := []corev1.VolumeMount{} if conf != nil && conf.CryptoConfig != nil && len(conf.CryptoConfig.CryptoSecrets) > 0 { @@ -677,24 +800,36 @@ func generateContainerVolumeMountsFromProducerConf(conf *v1alpha1.ProducerConfig } func generateContainerVolumeMounts(volumeMounts []corev1.VolumeMount, producerConf *v1alpha1.ProducerConfig, - consumerConfs map[string]v1alpha1.ConsumerConfig, tlsConfig TLSConfig) []corev1.VolumeMount { + consumerConfs map[string]v1alpha1.ConsumerConfig, + tlsConfig TLSConfig, + oauthConfig *v1alpha1.Oauth2Config, + javaRuntime *v1alpha1.JavaRuntime, + pythonRuntime *v1alpha1.PythonRuntime, + goRuntime *v1alpha1.GoRuntime) []corev1.VolumeMount { mounts := []corev1.VolumeMount{} mounts = append(mounts, volumeMounts...) if !reflect.ValueOf(tlsConfig).IsNil() && tlsConfig.HasSecretVolume() { mounts = append(mounts, generateVolumeMountFromTLSConfig(tlsConfig)) } + if oauthConfig != nil { + mounts = append(mounts, generateVolumeMountFromOauthConfig(*oauthConfig)) + } + mounts = append(mounts, generateDownloaderVolumeMountsForRuntime(javaRuntime, pythonRuntime, goRuntime)...) mounts = append(mounts, generateContainerVolumeMountsFromProducerConf(producerConf)...) mounts = append(mounts, generateContainerVolumeMountsFromConsumerConfigs(consumerConfs)...) return mounts } func generatePodVolumes(podVolumes []corev1.Volume, producerConf *v1alpha1.ProducerConfig, - consumerConfs map[string]v1alpha1.ConsumerConfig, tlsConfig TLSConfig) []corev1.Volume { + consumerConfs map[string]v1alpha1.ConsumerConfig, tlsConfig TLSConfig, oauth2Config *v1alpha1.Oauth2Config) []corev1.Volume { volumes := []corev1.Volume{} volumes = append(volumes, podVolumes...) if !reflect.ValueOf(tlsConfig).IsNil() && tlsConfig.HasSecretVolume() { volumes = append(volumes, generateVolumeFromTLSConfig(tlsConfig)) } + if oauth2Config != nil { + volumes = append(volumes, generateVolumeFromOauth2Config(oauth2Config)) + } volumes = append(volumes, generateContainerVolumesFromProducerConf(producerConf)...) volumes = append(volumes, generateContainerVolumesFromConsumerConfigs(consumerConfs)...) return volumes @@ -811,3 +946,11 @@ func getDecimalSIMemory(quantity *resource.Quantity) string { func getTLSTrustCertPath(tlsVolume TLSConfig, path string) string { return fmt.Sprintf("%s/%s", tlsVolume.GetMountPath(), path) } + +func getFilenameOfComponentPackage(componentPackage string) string { + data := strings.Split(componentPackage, "/") + if len(data) > 0 { + return data[len(data)-1] + } + return componentPackage +} diff --git a/controllers/spec/common_test.go b/controllers/spec/common_test.go index c1a15a4e4..59d0d25bf 100644 --- a/controllers/spec/common_test.go +++ b/controllers/spec/common_test.go @@ -31,66 +31,147 @@ import ( ) func TestGetDownloadCommand(t *testing.T) { - doTest := func(downloadPath, componentPackage string, expectedCommand []string) { - var tlsConfig *v1alpha1.PulsarTLSConfig - actualResult := getDownloadCommand(downloadPath, componentPackage, false, false, tlsConfig) - assert.Equal(t, expectedCommand, actualResult) - } - testData := []struct { downloadPath string componentPackage string + tlsConfig *v1alpha1.PulsarTLSConfig + oauth2Config *v1alpha1.Oauth2Config expectedCommand []string }{ // test get the download command with package name - {"function://public/default/test@v1", "function-package.jar", + {"function://public/default/test@v1", "function-package.jar", nil, nil, []string{ - PulsarAdminExecutableFile, - "--admin-url", "$webServiceURL", + PulsarctlExecutableFile, + "--admin-service-url", "$webServiceURL", "packages", "download", "function://public/default/test@v1", "--path", "function-package.jar", }, }, - {"sink://public/default/test@v1", "sink-package.jar", + {"sink://public/default/test@v1", "sink-package.jar", nil, nil, []string{ - PulsarAdminExecutableFile, - "--admin-url", "$webServiceURL", + PulsarctlExecutableFile, + "--admin-service-url", "$webServiceURL", "packages", "download", "sink://public/default/test@v1", "--path", "sink-package.jar", }, }, - {"source://public/default/test@v1", "source-package.jar", + {"source://public/default/test@v1", "source-package.jar", nil, nil, []string{ - PulsarAdminExecutableFile, - "--admin-url", "$webServiceURL", + PulsarctlExecutableFile, + "--admin-service-url", "$webServiceURL", "packages", "download", "source://public/default/test@v1", "--path", "source-package.jar", }, }, // test get the download command with normal name - {"/test", "test.jar", + {"/test", "test.jar", nil, nil, []string{ - PulsarAdminExecutableFile, - "--admin-url", "$webServiceURL", + PulsarctlExecutableFile, + "--admin-service-url", "$webServiceURL", "functions", "download", "--path", "/test", "--destination-file", "test.jar", }, }, // test get the download command with a wrong package name - {"source/public/default/test@v1", "source-package.jar", + {"source/public/default/test@v1", "source-package.jar", nil, nil, []string{ - PulsarAdminExecutableFile, - "--admin-url", "$webServiceURL", + PulsarctlExecutableFile, + "--admin-service-url", "$webServiceURL", "functions", "download", "--path", "source/public/default/test@v1", "--destination-file", "source-package.jar", }, }, - {"source:/public/default/test@v1", "source-package.jar", + {"source:/public/default/test@v1", "source-package.jar", nil, nil, []string{ - PulsarAdminExecutableFile, - "--admin-url", "$webServiceURL", + PulsarctlExecutableFile, + "--admin-service-url", "$webServiceURL", "functions", "download", "--path", "source:/public/default/test@v1", "--destination-file", "source-package.jar", }, }, + // test get the download command with an oauth2 config + {"function://public/default/test@v1", "function-package.jar", nil, + &v1alpha1.Oauth2Config{ + Audience: "test-audience", + ClientId: "test-client-id", + IssuerUrl: "test-issuer-url", + KeySecretName: "test-private-key", + KeySecretKey: "auth.json", + }, + []string{ + PulsarctlExecutableFile, + "context", + "set", + "downloader", + "--admin-service-url", + "$webServiceURL", + "--issuer-endpoint", + "test-issuer-url", + "--client-id", + "test-client-id", + "--audience", + "test-audience", + "--key-file", + "/etc/oauth2/auth.json", + "&& " + PulsarctlExecutableFile, + "oauth2", + "activate", + "&& " + PulsarctlExecutableFile, + "packages", "download", "function://public/default/test@v1", "--path", "function-package.jar", + }, + }, + // test get the download command with a tls config + {"function://public/default/test@v1", "function-package.jar", + &v1alpha1.PulsarTLSConfig{ + TLSConfig: v1alpha1.TLSConfig{ + Enabled: true, + AllowInsecure: false, + HostnameVerification: true, + CertSecretName: "test-secret", + CertSecretKey: "test-key", + }, + }, nil, + []string{ + PulsarctlExecutableFile, + "--admin-service-url", "$webServiceURL", + "--tls-enable-hostname-verification", + "--tls-trust-cert-path", "/etc/tls/pulsar-functions/test-key", + "packages", "download", "function://public/default/test@v1", "--path", "function-package.jar", + }, + }, + {"function://public/default/test@v1", "function-package.jar", + &v1alpha1.PulsarTLSConfig{ + TLSConfig: v1alpha1.TLSConfig{ + Enabled: false, + AllowInsecure: false, + HostnameVerification: true, + CertSecretName: "test-secret", + CertSecretKey: "test-key", + }, + }, nil, + []string{ + PulsarctlExecutableFile, + "--admin-service-url", "$webServiceURL", + "packages", "download", "function://public/default/test@v1", "--path", "function-package.jar", + }, + }, + {"function://public/default/test@v1", "function-package.jar", + &v1alpha1.PulsarTLSConfig{ + TLSConfig: v1alpha1.TLSConfig{ + Enabled: true, + AllowInsecure: true, + HostnameVerification: false, + CertSecretName: "test-secret", + CertSecretKey: "test-key", + }, + }, nil, + []string{ + PulsarctlExecutableFile, + "--admin-service-url", "$webServiceURL", + "--tls-allow-insecure", + "--tls-trust-cert-path", "/etc/tls/pulsar-functions/test-key", + "packages", "download", "function://public/default/test@v1", "--path", "function-package.jar", + }, + }, } for _, v := range testData { - doTest(v.downloadPath, v.componentPackage, v.expectedCommand) + actualResult := getDownloadCommand(v.downloadPath, v.componentPackage, false, false, v.tlsConfig, v.oauth2Config) + assert.Equal(t, v.expectedCommand, actualResult) } } @@ -179,7 +260,7 @@ func TestGetSourceRunnerImage(t *testing.T) { func TestMakeGoFunctionCommand(t *testing.T) { function := makeGoFunctionSample(TestFunctionName) - commands := MakeGoFunctionCommand("", "/pulsar/go-func", function) + commands := MakeGoFunctionCommand("/pulsar/go-func", function) assert.Equal(t, commands[0], "sh") assert.Equal(t, commands[1], "-c") assert.True(t, strings.HasPrefix(commands[2], "SHARD_ID=${POD_NAME##*-} && echo shardId=${SHARD_ID}")) @@ -257,6 +338,7 @@ func TestGeneratePodVolumes(t *testing.T) { producerConf *v1alpha1.ProducerConfig consumerConfs map[string]v1alpha1.ConsumerConfig trustCert *v1alpha1.PulsarTLSConfig + oauth2Config *v1alpha1.Oauth2Config } tests := []struct { name string @@ -432,10 +514,101 @@ func TestGeneratePodVolumes(t *testing.T) { }, }, }, + { + name: "generate pod volumes from oauth2 config", + args: args{ + podVolumes: nil, + producerConf: &v1alpha1.ProducerConfig{ + CryptoConfig: &v1alpha1.CryptoConfig{ + CryptoSecrets: []v1alpha1.CryptoSecret{{ + SecretName: "test-producer-secret", + SecretKey: "test-producer-key", + }}, + }, + }, + consumerConfs: map[string]v1alpha1.ConsumerConfig{ + "test-consumer": { + CryptoConfig: &v1alpha1.CryptoConfig{ + CryptoSecrets: []v1alpha1.CryptoSecret{{ + SecretName: "test-consumer-secret", + SecretKey: "test-consumer-key", + }}, + }, + }, + }, + trustCert: &v1alpha1.PulsarTLSConfig{ + TLSConfig: v1alpha1.TLSConfig{ + Enabled: true, + AllowInsecure: true, + HostnameVerification: true, + CertSecretName: "test-trust-secret", + CertSecretKey: "test-trust-key", + }, + }, + oauth2Config: &v1alpha1.Oauth2Config{ + Audience: "test-audience", + ClientId: "test-client-id", + IssuerUrl: "test-issuer-url", + KeySecretName: "oauth2", + }, + }, + want: []corev1.Volume{ + { + Name: "test-trust-secret-test-trust-key", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: "test-trust-secret", + Items: []corev1.KeyToPath{ + { + Key: "test-trust-key", + Path: "test-trust-key", + }, + }, + }, + }, + }, + { + Name: "oauth2", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: "oauth2", + }, + }, + }, + { + Name: "test-producer-secret-test-producer-key", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: "test-producer-secret", + Items: []corev1.KeyToPath{ + { + Key: "test-producer-key", + Path: "test-producer-key", + }, + }, + }, + }, + }, + { + Name: "test-consumer-secret-test-consumer-key", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: "test-consumer-secret", + Items: []corev1.KeyToPath{ + { + Key: "test-consumer-key", + Path: "test-consumer-key", + }, + }, + }, + }, + }, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.Equalf(t, tt.want, generatePodVolumes(tt.args.podVolumes, tt.args.producerConf, tt.args.consumerConfs, tt.args.trustCert), "generatePodVolumes(%v, %v, %v, %v)", tt.args.podVolumes, tt.args.producerConf, tt.args.consumerConfs, tt.args.trustCert) + assert.Equalf(t, tt.want, generatePodVolumes(tt.args.podVolumes, tt.args.producerConf, tt.args.consumerConfs, tt.args.trustCert, tt.args.oauth2Config), "generatePodVolumes(%v, %v, %v, %v)", tt.args.podVolumes, tt.args.producerConf, tt.args.consumerConfs, tt.args.trustCert) }) } } @@ -446,6 +619,8 @@ func TestGenerateContainerVolumeMounts(t *testing.T) { producerConf *v1alpha1.ProducerConfig consumerConfs map[string]v1alpha1.ConsumerConfig trustCert *v1alpha1.PulsarTLSConfig + oauth2Config *v1alpha1.Oauth2Config + javaRuntime *v1alpha1.JavaRuntime } tests := []struct { name string @@ -569,10 +744,136 @@ func TestGenerateContainerVolumeMounts(t *testing.T) { }, }, }, + { + name: "generate volume mounts from oauth2 config", + args: args{ + producerConf: &v1alpha1.ProducerConfig{ + CryptoConfig: &v1alpha1.CryptoConfig{ + CryptoSecrets: []v1alpha1.CryptoSecret{{ + SecretName: "test-producer-secret", + SecretKey: "test-producer-key", + AsVolume: "/test-producer", + }}, + }, + }, + consumerConfs: map[string]v1alpha1.ConsumerConfig{ + "test-consumer": { + CryptoConfig: &v1alpha1.CryptoConfig{ + CryptoSecrets: []v1alpha1.CryptoSecret{{ + SecretName: "test-consumer-secret", + SecretKey: "test-consumer-key", + AsVolume: "/test-consumer", + }}, + }, + }, + }, + trustCert: &v1alpha1.PulsarTLSConfig{ + TLSConfig: v1alpha1.TLSConfig{ + Enabled: true, + AllowInsecure: true, + HostnameVerification: true, + CertSecretName: "test-trust-secret", + CertSecretKey: "test-trust-key", + }, + }, + oauth2Config: &v1alpha1.Oauth2Config{ + Audience: "test-audience", + ClientId: "test-client-id", + IssuerUrl: "test-issuer-url", + KeySecretName: "oauth2", + }, + }, + want: []corev1.VolumeMount{ + { + Name: "test-trust-secret-test-trust-key", + MountPath: "/etc/tls/pulsar-functions", + }, + { + Name: "oauth2", + MountPath: "/etc/oauth2", + }, + { + Name: "test-producer-secret-test-producer-key", + MountPath: "/test-producer", + }, + { + Name: "test-consumer-secret-test-consumer-key", + MountPath: "/test-consumer", + }, + }, + }, + { + name: "generate volume mounts from runtime config", + args: args{ + producerConf: &v1alpha1.ProducerConfig{ + CryptoConfig: &v1alpha1.CryptoConfig{ + CryptoSecrets: []v1alpha1.CryptoSecret{{ + SecretName: "test-producer-secret", + SecretKey: "test-producer-key", + AsVolume: "/test-producer", + }}, + }, + }, + consumerConfs: map[string]v1alpha1.ConsumerConfig{ + "test-consumer": { + CryptoConfig: &v1alpha1.CryptoConfig{ + CryptoSecrets: []v1alpha1.CryptoSecret{{ + SecretName: "test-consumer-secret", + SecretKey: "test-consumer-key", + AsVolume: "/test-consumer", + }}, + }, + }, + }, + trustCert: &v1alpha1.PulsarTLSConfig{ + TLSConfig: v1alpha1.TLSConfig{ + Enabled: true, + AllowInsecure: true, + HostnameVerification: true, + CertSecretName: "test-trust-secret", + CertSecretKey: "test-trust-key", + }, + }, + oauth2Config: &v1alpha1.Oauth2Config{ + Audience: "test-audience", + ClientId: "test-client-id", + IssuerUrl: "test-issuer-url", + KeySecretName: "oauth2", + }, + javaRuntime: &v1alpha1.JavaRuntime{ + Jar: "test.jar", + JarLocation: "/test-jar-location", + }, + }, + want: []corev1.VolumeMount{ + { + Name: "test-trust-secret-test-trust-key", + MountPath: "/etc/tls/pulsar-functions", + }, + { + Name: "oauth2", + MountPath: "/etc/oauth2", + }, + { + Name: DownloaderVolume, + MountPath: "/pulsar/test.jar", + SubPath: "test.jar", + ReadOnly: true, + }, + { + Name: "test-producer-secret-test-producer-key", + MountPath: "/test-producer", + }, + { + Name: "test-consumer-secret-test-consumer-key", + MountPath: "/test-consumer", + }, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.Equalf(t, tt.want, generateContainerVolumeMounts(tt.args.volumeMounts, tt.args.producerConf, tt.args.consumerConfs, tt.args.trustCert), "generateContainerVolumeMounts(%v, %v, %v, %v)", tt.args.volumeMounts, tt.args.producerConf, tt.args.consumerConfs, tt.args.trustCert) + assert.Equalf(t, tt.want, generateContainerVolumeMounts(tt.args.volumeMounts, tt.args.producerConf, tt.args.consumerConfs, tt.args.trustCert, tt.args.oauth2Config, tt.args.javaRuntime, nil, nil), "generateContainerVolumeMounts(%v, %v, %v, %v)", tt.args.volumeMounts, tt.args.producerConf, tt.args.consumerConfs, tt.args.trustCert) }) } } diff --git a/controllers/spec/function.go b/controllers/spec/function.go index 3ade14386..f1155ace7 100644 --- a/controllers/spec/function.go +++ b/controllers/spec/function.go @@ -54,8 +54,9 @@ func MakeFunctionService(function *v1alpha1.Function) *corev1.Service { func MakeFunctionStatefulSet(function *v1alpha1.Function) *appsv1.StatefulSet { objectMeta := MakeFunctionObjectMeta(function) - return MakeStatefulSet(objectMeta, function.Spec.Replicas, - MakeFunctionContainer(function), makeFunctionVolumes(function), makeFunctionLabels(function), function.Spec.Pod) + return MakeStatefulSet(objectMeta, function.Spec.Replicas, function.Spec.DownloaderImage, + MakeFunctionContainer(function), makeFunctionVolumes(function), makeFunctionLabels(function), function.Spec.Pod, + *function.Spec.Pulsar, function.Spec.Java, function.Spec.Python, function.Spec.Golang) } func MakeFunctionObjectMeta(function *v1alpha1.Function) *metav1.ObjectMeta { @@ -73,14 +74,20 @@ func makeFunctionVolumes(function *v1alpha1.Function) []corev1.Volume { return generatePodVolumes(function.Spec.Pod.Volumes, function.Spec.Output.ProducerConf, function.Spec.Input.SourceSpecs, - function.Spec.Pulsar.TLSConfig) + function.Spec.Pulsar.TLSConfig, + function.Spec.Pulsar.Oauth2Config) } func makeFunctionVolumeMounts(function *v1alpha1.Function) []corev1.VolumeMount { return generateContainerVolumeMounts(function.Spec.VolumeMounts, function.Spec.Output.ProducerConf, function.Spec.Input.SourceSpecs, - function.Spec.Pulsar.TLSConfig) + function.Spec.Pulsar.TLSConfig, + function.Spec.Pulsar.Oauth2Config, + function.Spec.Java, + function.Spec.Python, + function.Spec.Golang, + ) } func MakeFunctionContainer(function *v1alpha1.Function) *corev1.Container { @@ -118,7 +125,7 @@ func makeFunctionCommand(function *v1alpha1.Function) []string { if spec.Java != nil { if spec.Java.Jar != "" { - return MakeJavaFunctionCommand(spec.Java.JarLocation, spec.Java.Jar, + return MakeJavaFunctionCommand(spec.Java.Jar, spec.Name, spec.ClusterName, generateFunctionDetailsInJSON(function), getDecimalSIMemory(spec.Resources.Requests.Memory()), spec.Java.ExtraDependenciesDir, string(function.UID), spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", function.Spec.SecretsMap, @@ -126,15 +133,14 @@ func makeFunctionCommand(function *v1alpha1.Function) []string { } } else if spec.Python != nil { if spec.Python.Py != "" { - return MakePythonFunctionCommand(spec.Python.PyLocation, spec.Python.Py, + return MakePythonFunctionCommand(spec.Python.Py, spec.Name, spec.ClusterName, generateFunctionDetailsInJSON(function), string(function.UID), spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", function.Spec.SecretsMap, function.Spec.StateConfig, function.Spec.Pulsar.TLSConfig) } } else if spec.Golang != nil { if spec.Golang.Go != "" { - return MakeGoFunctionCommand(spec.Golang.GoLocation, spec.Golang.Go, - function) + return MakeGoFunctionCommand(spec.Golang.Go, function) } } diff --git a/controllers/spec/sink.go b/controllers/spec/sink.go index 6b946fa17..f18395a48 100644 --- a/controllers/spec/sink.go +++ b/controllers/spec/sink.go @@ -50,8 +50,9 @@ func MakeSinkService(sink *v1alpha1.Sink) *corev1.Service { func MakeSinkStatefulSet(sink *v1alpha1.Sink) *appsv1.StatefulSet { objectMeta := MakeSinkObjectMeta(sink) - return MakeStatefulSet(objectMeta, sink.Spec.Replicas, MakeSinkContainer(sink), - makeSinkVolumes(sink), MakeSinkLabels(sink), sink.Spec.Pod) + return MakeStatefulSet(objectMeta, sink.Spec.Replicas, sink.Spec.DownloaderImage, MakeSinkContainer(sink), + makeSinkVolumes(sink), MakeSinkLabels(sink), sink.Spec.Pod, *sink.Spec.Pulsar, + sink.Spec.Java, sink.Spec.Python, sink.Spec.Golang) } func MakeSinkServiceName(sink *v1alpha1.Sink) string { @@ -100,16 +101,22 @@ func MakeSinkLabels(sink *v1alpha1.Sink) map[string]string { } func makeSinkVolumes(sink *v1alpha1.Sink) []corev1.Volume { - return generatePodVolumes(sink.Spec.Pod.Volumes, nil, sink.Spec.Input.SourceSpecs, sink.Spec.Pulsar.TLSConfig) + return generatePodVolumes(sink.Spec.Pod.Volumes, + nil, + sink.Spec.Input.SourceSpecs, + sink.Spec.Pulsar.TLSConfig, + sink.Spec.Pulsar.Oauth2Config) } func makeSinkVolumeMounts(sink *v1alpha1.Sink) []corev1.VolumeMount { - return generateContainerVolumeMounts(sink.Spec.VolumeMounts, nil, sink.Spec.Input.SourceSpecs, sink.Spec.Pulsar.TLSConfig) + return generateContainerVolumeMounts(sink.Spec.VolumeMounts, nil, sink.Spec.Input.SourceSpecs, + sink.Spec.Pulsar.TLSConfig, sink.Spec.Pulsar.Oauth2Config, + sink.Spec.Java, sink.Spec.Python, sink.Spec.Golang) } func MakeSinkCommand(sink *v1alpha1.Sink) []string { spec := sink.Spec - return MakeJavaFunctionCommand(spec.Java.JarLocation, spec.Java.Jar, + return MakeJavaFunctionCommand(spec.Java.Jar, spec.Name, spec.ClusterName, generateSinkDetailsInJSON(sink), getDecimalSIMemory(spec.Resources.Requests.Memory()), spec.Java.ExtraDependenciesDir, string(sink.UID), spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", spec.SecretsMap, nil, spec.Pulsar.TLSConfig) diff --git a/controllers/spec/source.go b/controllers/spec/source.go index 0f6d44046..d31eb99e1 100644 --- a/controllers/spec/source.go +++ b/controllers/spec/source.go @@ -50,8 +50,9 @@ func MakeSourceService(source *v1alpha1.Source) *corev1.Service { func MakeSourceStatefulSet(source *v1alpha1.Source) *appsv1.StatefulSet { objectMeta := MakeSourceObjectMeta(source) - return MakeStatefulSet(objectMeta, source.Spec.Replicas, MakeSourceContainer(source), - makeSourceVolumes(source), makeSourceLabels(source), source.Spec.Pod) + return MakeStatefulSet(objectMeta, source.Spec.Replicas, source.Spec.DownloaderImage, MakeSourceContainer(source), + makeSourceVolumes(source), makeSourceLabels(source), source.Spec.Pod, *source.Spec.Pulsar, + source.Spec.Java, source.Spec.Python, source.Spec.Golang) } func MakeSourceObjectMeta(source *v1alpha1.Source) *metav1.ObjectMeta { @@ -95,16 +96,22 @@ func makeSourceLabels(source *v1alpha1.Source) map[string]string { } func makeSourceVolumes(source *v1alpha1.Source) []corev1.Volume { - return generatePodVolumes(source.Spec.Pod.Volumes, source.Spec.Output.ProducerConf, nil, source.Spec.Pulsar.TLSConfig) + return generatePodVolumes(source.Spec.Pod.Volumes, + source.Spec.Output.ProducerConf, + nil, + source.Spec.Pulsar.TLSConfig, + source.Spec.Pulsar.Oauth2Config) } func makeSourceVolumeMounts(source *v1alpha1.Source) []corev1.VolumeMount { - return generateContainerVolumeMounts(source.Spec.VolumeMounts, source.Spec.Output.ProducerConf, nil, source.Spec.Pulsar.TLSConfig) + return generateContainerVolumeMounts(source.Spec.VolumeMounts, source.Spec.Output.ProducerConf, nil, + source.Spec.Pulsar.TLSConfig, source.Spec.Pulsar.Oauth2Config, + source.Spec.Java, source.Spec.Python, source.Spec.Golang) } func makeSourceCommand(source *v1alpha1.Source) []string { spec := source.Spec - return MakeJavaFunctionCommand(spec.Java.JarLocation, spec.Java.Jar, + return MakeJavaFunctionCommand(spec.Java.Jar, spec.Name, spec.ClusterName, generateSourceDetailsInJSON(source), getDecimalSIMemory(spec.Resources.Requests.Memory()), spec.Java.ExtraDependenciesDir, string(source.UID), spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", spec.SecretsMap, nil, spec.Pulsar.TLSConfig) diff --git a/controllers/spec/utils.go b/controllers/spec/utils.go index 2f8958020..d05d8bcdf 100644 --- a/controllers/spec/utils.go +++ b/controllers/spec/utils.go @@ -383,6 +383,10 @@ func generateVolumeNameFromTLSConfig(c TLSConfig) string { return sanitizeVolumeName(c.SecretName() + "-" + c.SecretKey()) } +func generateVolumeNameFromOauth2Config(o v1alpha1.Oauth2Config) string { + return sanitizeVolumeName(o.KeySecretName) +} + var invalidDNS1123Characters = regexp.MustCompile("[^-a-z0-9]+") // sanitizeVolumeName ensures that the given volume name is a valid DNS-1123 label diff --git a/manifests/crd.yaml b/manifests/crd.yaml index 1876c19b4..06f31c3a0 100644 --- a/manifests/crd.yaml +++ b/manifests/crd.yaml @@ -51,6 +51,8 @@ spec: type: string deadLetterTopic: type: string + downloaderImage: + type: string forwardSourceMessageProperty: type: boolean funcConfig: @@ -2604,6 +2606,25 @@ spec: properties: authSecret: type: string + oauth2Config: + properties: + audience: + type: string + clientId: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + required: + - audience + - clientId + - issuerUrl + - keySecretKey + - keySecretName + type: object pulsarConfig: type: string tlsConfig: @@ -2737,6 +2758,8 @@ spec: type: string deadLetterTopic: type: string + downloaderImage: + type: string golang: properties: go: @@ -5224,6 +5247,25 @@ spec: properties: authSecret: type: string + oauth2Config: + properties: + audience: + type: string + clientId: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + required: + - audience + - clientId + - issuerUrl + - keySecretKey + - keySecretName + type: object pulsarConfig: type: string tlsConfig: @@ -5335,6 +5377,8 @@ spec: type: string clusterName: type: string + downloaderImage: + type: string forwardSourceMessageProperty: type: boolean golang: @@ -7805,6 +7849,25 @@ spec: properties: authSecret: type: string + oauth2Config: + properties: + audience: + type: string + clientId: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + required: + - audience + - clientId + - issuerUrl + - keySecretKey + - keySecretName + type: object pulsarConfig: type: string tlsConfig: @@ -7995,6 +8058,8 @@ spec: type: string deadLetterTopic: type: string + downloaderImage: + type: string forwardSourceMessageProperty: type: boolean funcConfig: @@ -10548,6 +10613,25 @@ spec: properties: authSecret: type: string + oauth2Config: + properties: + audience: + type: string + clientId: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + required: + - audience + - clientId + - issuerUrl + - keySecretKey + - keySecretName + type: object pulsarConfig: type: string tlsConfig: @@ -10757,6 +10841,8 @@ spec: type: string deadLetterTopic: type: string + downloaderImage: + type: string golang: properties: go: @@ -13244,6 +13330,25 @@ spec: properties: authSecret: type: string + oauth2Config: + properties: + audience: + type: string + clientId: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + required: + - audience + - clientId + - issuerUrl + - keySecretKey + - keySecretName + type: object pulsarConfig: type: string tlsConfig: @@ -13431,6 +13536,8 @@ spec: type: string clusterName: type: string + downloaderImage: + type: string forwardSourceMessageProperty: type: boolean golang: @@ -15901,6 +16008,25 @@ spec: properties: authSecret: type: string + oauth2Config: + properties: + audience: + type: string + clientId: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + required: + - audience + - clientId + - issuerUrl + - keySecretKey + - keySecretName + type: object pulsarConfig: type: string tlsConfig: