From 20c79df4d5781d8559d216127a92d38b41c093ac Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Wed, 13 Nov 2024 17:02:08 +0100 Subject: [PATCH] create new integration/source folder and factor common code into helper.go Signed-off-by: Matthias Wessendorf --- cmd/controller/main.go | 4 +- pkg/reconciler/integration/helper.go | 128 ++++++++++ pkg/reconciler/integration/helper_test.go | 77 ++++++ .../source}/controller.go | 2 +- .../source}/controller_test.go | 2 +- .../source}/integrationsource.go | 4 +- .../source}/integrationsource_test.go | 2 +- .../source/resources/containersource.go | 133 ++++++++++ .../source}/resources/containersource_test.go | 53 ---- .../source}/resources/names.go | 0 .../resources/containersource.go | 237 ------------------ 11 files changed, 345 insertions(+), 297 deletions(-) create mode 100644 pkg/reconciler/integration/helper.go create mode 100644 pkg/reconciler/integration/helper_test.go rename pkg/reconciler/{integrationsource => integration/source}/controller.go (99%) rename pkg/reconciler/{integrationsource => integration/source}/controller_test.go (98%) rename pkg/reconciler/{integrationsource => integration/source}/integrationsource.go (97%) rename pkg/reconciler/{integrationsource => integration/source}/integrationsource_test.go (99%) create mode 100644 pkg/reconciler/integration/source/resources/containersource.go rename pkg/reconciler/{integrationsource => integration/source}/resources/containersource_test.go (66%) rename pkg/reconciler/{integrationsource => integration/source}/resources/names.go (100%) delete mode 100644 pkg/reconciler/integrationsource/resources/containersource.go diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 616c42ee5a7..68d19d50f49 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -21,6 +21,7 @@ import ( // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "errors" + "knative.dev/eventing/pkg/reconciler/integration/source" "log" "net/http" "os" @@ -41,7 +42,6 @@ import ( "knative.dev/eventing/pkg/reconciler/channel" "knative.dev/eventing/pkg/reconciler/containersource" "knative.dev/eventing/pkg/reconciler/eventtype" - "knative.dev/eventing/pkg/reconciler/integrationsource" "knative.dev/eventing/pkg/reconciler/parallel" "knative.dev/eventing/pkg/reconciler/pingsource" "knative.dev/eventing/pkg/reconciler/sequence" @@ -105,7 +105,7 @@ func main() { apiserversource.NewController, pingsource.NewController, containersource.NewController, - integrationsource.NewController, + source.NewController, // Sources CRD sourcecrd.NewController, diff --git a/pkg/reconciler/integration/helper.go b/pkg/reconciler/integration/helper.go new file mode 100644 index 00000000000..d77cc576340 --- /dev/null +++ b/pkg/reconciler/integration/helper.go @@ -0,0 +1,128 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "fmt" + "reflect" + "strconv" + "strings" + + corev1 "k8s.io/api/core/v1" +) + +const prefix = "CAMEL_KAMELET_" + +func GenerateEnvVarsFromStruct(prefix string, s interface{}) []corev1.EnvVar { + var envVars []corev1.EnvVar + + // Use reflection to inspect the struct fields + v := reflect.ValueOf(s) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + + t := v.Type() + + for i := 0; i < v.NumField(); i++ { + field := v.Field(i) + fieldType := t.Field(i) + + // Skip unexported fields + if !field.CanInterface() { + continue + } + + // Handle embedded/anonymous structs recursively + if fieldType.Anonymous && field.Kind() == reflect.Struct { + // Recursively handle embedded structs with the same prefix + envVars = append(envVars, GenerateEnvVarsFromStruct(prefix, field.Interface())...) + continue + } + + // First, check for the custom 'camel' tag + envVarName := fieldType.Tag.Get("camel") + if envVarName == "" { + // If 'camel' tag is not present, fall back to the 'json' tag or Go field name + jsonTag := fieldType.Tag.Get("json") + tagName := strings.Split(jsonTag, ",")[0] + if tagName == "" || tagName == "-" { + tagName = fieldType.Name + } + envVarName = fmt.Sprintf("%s_%s", prefix, strings.ToUpper(tagName)) + } + + if field.Kind() == reflect.Ptr { + if field.IsNil() { + continue + } + field = field.Elem() + } + + var value string + switch field.Kind() { + case reflect.Int, reflect.Int32, reflect.Int64: + value = strconv.FormatInt(field.Int(), 10) + case reflect.Bool: + value = strconv.FormatBool(field.Bool()) + case reflect.String: + value = field.String() + default: + // Skip unsupported types + continue + } + + // Skip zero/empty values + if value == "" { + continue + } + + envVars = append(envVars, corev1.EnvVar{ + Name: envVarName, + Value: value, + }) + } + + return envVars +} + +func MakeSecretEnvVar(name, key, secretName string) corev1.EnvVar { + return corev1.EnvVar{ + Name: name, + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + Key: key, + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName, + }, + }, + }, + } +} + +func MakeSSLEnvVar() []corev1.EnvVar { + return []corev1.EnvVar{ + { + Name: "CAMEL_KNATIVE_CLIENT_SSL_ENABLED", + Value: "true", + }, + { + Name: "CAMEL_KNATIVE_CLIENT_SSL_CERT_PATH", + Value: "/knative-custom-certs/knative-eventing-bundle.pem", + }, + } +} diff --git a/pkg/reconciler/integration/helper_test.go b/pkg/reconciler/integration/helper_test.go new file mode 100644 index 00000000000..0da78460379 --- /dev/null +++ b/pkg/reconciler/integration/helper_test.go @@ -0,0 +1,77 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" +) + +func TestGenerateEnvVarsFromStruct(t *testing.T) { + type TestStruct struct { + Field1 int `json:"field1"` + Field2 bool `json:"field2"` + Field3 string `json:"field3"` + } + + prefix := "TEST_PREFIX" + input := &TestStruct{ + Field1: 123, + Field2: true, + Field3: "hello", + } + + // Expected environment variables including SSL settings + want := []corev1.EnvVar{ + {Name: "TEST_PREFIX_FIELD1", Value: "123"}, + {Name: "TEST_PREFIX_FIELD2", Value: "true"}, + {Name: "TEST_PREFIX_FIELD3", Value: "hello"}, + } + + got := GenerateEnvVarsFromStruct(prefix, input) + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("generateEnvVarsFromStruct() mismatch (-want +got):\n%s", diff) + } +} + +func TestGenerateEnvVarsFromStruct_S3WithCamelTag(t *testing.T) { + type AWSS3 struct { + Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN"` + Region string `json:"region,omitempty"` + } + + prefix := "CAMEL_KAMELET_AWS_S3_SOURCE" + input := AWSS3{ + Arn: "arn:aws:s3:::example-bucket", + Region: "us-west-2", + } + + // Expected environment variables including SSL settings and camel tag for Arn + want := []corev1.EnvVar{ + {Name: "CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN", Value: "arn:aws:s3:::example-bucket"}, + {Name: "CAMEL_KAMELET_AWS_S3_SOURCE_REGION", Value: "us-west-2"}, + } + + got := GenerateEnvVarsFromStruct(prefix, input) + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("generateEnvVarsFromStruct_S3WithCamelTag() mismatch (-want +got):\n%s", diff) + } +} diff --git a/pkg/reconciler/integrationsource/controller.go b/pkg/reconciler/integration/source/controller.go similarity index 99% rename from pkg/reconciler/integrationsource/controller.go rename to pkg/reconciler/integration/source/controller.go index a84bec85815..751bed6ddf0 100644 --- a/pkg/reconciler/integrationsource/controller.go +++ b/pkg/reconciler/integration/source/controller.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package integrationsource +package source import ( "context" diff --git a/pkg/reconciler/integrationsource/controller_test.go b/pkg/reconciler/integration/source/controller_test.go similarity index 98% rename from pkg/reconciler/integrationsource/controller_test.go rename to pkg/reconciler/integration/source/controller_test.go index e4c21f26565..2b890e0c284 100644 --- a/pkg/reconciler/integrationsource/controller_test.go +++ b/pkg/reconciler/integration/source/controller_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package integrationsource +package source import ( "context" diff --git a/pkg/reconciler/integrationsource/integrationsource.go b/pkg/reconciler/integration/source/integrationsource.go similarity index 97% rename from pkg/reconciler/integrationsource/integrationsource.go rename to pkg/reconciler/integration/source/integrationsource.go index ca2f9aa95fa..c466058b4bc 100644 --- a/pkg/reconciler/integrationsource/integrationsource.go +++ b/pkg/reconciler/integration/source/integrationsource.go @@ -14,11 +14,12 @@ See the License for the specific language governing permissions and limitations under the License. */ -package integrationsource +package source import ( "context" "fmt" + "knative.dev/eventing/pkg/reconciler/integration/source/resources" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -32,7 +33,6 @@ import ( "knative.dev/eventing/pkg/client/injection/reconciler/sources/v1alpha1/integrationsource" v1listers "knative.dev/eventing/pkg/client/listers/sources/v1" listers "knative.dev/eventing/pkg/client/listers/sources/v1alpha1" - "knative.dev/eventing/pkg/reconciler/integrationsource/resources" "knative.dev/pkg/controller" "knative.dev/pkg/logging" pkgreconciler "knative.dev/pkg/reconciler" diff --git a/pkg/reconciler/integrationsource/integrationsource_test.go b/pkg/reconciler/integration/source/integrationsource_test.go similarity index 99% rename from pkg/reconciler/integrationsource/integrationsource_test.go rename to pkg/reconciler/integration/source/integrationsource_test.go index 92dc884bfcf..d500c0cccbb 100644 --- a/pkg/reconciler/integrationsource/integrationsource_test.go +++ b/pkg/reconciler/integration/source/integrationsource_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package integrationsource +package source import ( "fmt" diff --git a/pkg/reconciler/integration/source/resources/containersource.go b/pkg/reconciler/integration/source/resources/containersource.go new file mode 100644 index 00000000000..8607867a6f2 --- /dev/null +++ b/pkg/reconciler/integration/source/resources/containersource.go @@ -0,0 +1,133 @@ +/* + Copyright 2024 The Knative Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package resources + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" + "knative.dev/eventing/pkg/apis/sources/v1alpha1" + "knative.dev/eventing/pkg/reconciler/integration" + "knative.dev/pkg/kmeta" +) + +const ( + awsAccessKey = "aws.accessKey" + awsSecretKey = "aws.secretKey" +) + +func NewContainerSource(source *v1alpha1.IntegrationSource) *sourcesv1.ContainerSource { + return &sourcesv1.ContainerSource{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(source), + }, + Name: ContainerSourceName(source), + Namespace: source.Namespace, + }, + Spec: sourcesv1.ContainerSourceSpec{ + + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "source", + Image: selectImage(source), + ImagePullPolicy: corev1.PullIfNotPresent, + Env: makeEnv(source), + }, + }, + }, + }, + SourceSpec: source.Spec.SourceSpec, + }, + } +} + +// Function to create environment variables for Timer or AWS configurations dynamically +func makeEnv(source *v1alpha1.IntegrationSource) []corev1.EnvVar { + var envVars = integration.MakeSSLEnvVar() + + // Timer environment variables + if source.Spec.Timer != nil { + envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_TIMER_SOURCE", *source.Spec.Timer)...) + return envVars + } + + // Handle secret name only if AWS is configured + var secretName string + if source.Spec.Aws != nil && source.Spec.Aws.Auth != nil && source.Spec.Aws.Auth.Secret != nil && source.Spec.Aws.Auth.Secret.Ref != nil { + secretName = source.Spec.Aws.Auth.Secret.Ref.Name + } + + // AWS S3 environment variables + if source.Spec.Aws != nil && source.Spec.Aws.S3 != nil { + envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_AWS_S3_SOURCE", *source.Spec.Aws.S3)...) + if secretName != "" { + envVars = append(envVars, []corev1.EnvVar{ + integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_S3_SOURCE_ACCESSKEY", awsAccessKey, secretName), + integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_S3_SOURCE_SECRETKEY", awsSecretKey, secretName), + }...) + } + return envVars + } + + // AWS SQS environment variables + if source.Spec.Aws != nil && source.Spec.Aws.SQS != nil { + envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_AWS_SQS_SOURCE", *source.Spec.Aws.SQS)...) + if secretName != "" { + envVars = append(envVars, []corev1.EnvVar{ + integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SQS_SOURCE_ACCESSKEY", awsAccessKey, secretName), + integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SQS_SOURCE_SECRETKEY", awsSecretKey, secretName), + }...) + } + return envVars + } + + // AWS DynamoDB Streams environment variables + if source.Spec.Aws != nil && source.Spec.Aws.DDBStreams != nil { + envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE", *source.Spec.Aws.DDBStreams)...) + if secretName != "" { + envVars = append(envVars, []corev1.EnvVar{ + integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE_ACCESSKEY", awsAccessKey, secretName), + integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE_SECRETKEY", awsSecretKey, secretName), + }...) + } + return envVars + } + + // If no valid configuration is found, return empty envVars + return envVars +} + +func selectImage(source *v1alpha1.IntegrationSource) string { + if source.Spec.Timer != nil { + return "gcr.io/knative-nightly/timer-source:latest" + } + if source.Spec.Aws != nil { + if source.Spec.Aws.S3 != nil { + return "gcr.io/knative-nightly/aws-s3-source:latest" + } + if source.Spec.Aws.SQS != nil { + return "gcr.io/knative-nightly/aws-sqs-source:latest" + } + if source.Spec.Aws.DDBStreams != nil { + return "gcr.io/knative-nightly/aws-ddb-streams-source:latest" + } + } + return "" +} diff --git a/pkg/reconciler/integrationsource/resources/containersource_test.go b/pkg/reconciler/integration/source/resources/containersource_test.go similarity index 66% rename from pkg/reconciler/integrationsource/resources/containersource_test.go rename to pkg/reconciler/integration/source/resources/containersource_test.go index b512fdd784a..54336ec6781 100644 --- a/pkg/reconciler/integrationsource/resources/containersource_test.go +++ b/pkg/reconciler/integration/source/resources/containersource_test.go @@ -101,56 +101,3 @@ func TestNewContainerSource(t *testing.T) { t.Errorf("NewContainerSource() mismatch (-want +got):\n%s", diff) } } - -func TestGenerateEnvVarsFromStruct(t *testing.T) { - type TestStruct struct { - Field1 int `json:"field1"` - Field2 bool `json:"field2"` - Field3 string `json:"field3"` - } - - prefix := "TEST_PREFIX" - input := &TestStruct{ - Field1: 123, - Field2: true, - Field3: "hello", - } - - // Expected environment variables including SSL settings - want := []corev1.EnvVar{ - {Name: "TEST_PREFIX_FIELD1", Value: "123"}, - {Name: "TEST_PREFIX_FIELD2", Value: "true"}, - {Name: "TEST_PREFIX_FIELD3", Value: "hello"}, - } - - got := generateEnvVarsFromStruct(prefix, input) - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("generateEnvVarsFromStruct() mismatch (-want +got):\n%s", diff) - } -} - -func TestGenerateEnvVarsFromStruct_S3WithCamelTag(t *testing.T) { - type AWSS3 struct { - Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN"` - Region string `json:"region,omitempty"` - } - - prefix := "CAMEL_KAMELET_AWS_S3_SOURCE" - input := AWSS3{ - Arn: "arn:aws:s3:::example-bucket", - Region: "us-west-2", - } - - // Expected environment variables including SSL settings and camel tag for Arn - want := []corev1.EnvVar{ - {Name: "CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN", Value: "arn:aws:s3:::example-bucket"}, - {Name: "CAMEL_KAMELET_AWS_S3_SOURCE_REGION", Value: "us-west-2"}, - } - - got := generateEnvVarsFromStruct(prefix, input) - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("generateEnvVarsFromStruct_S3WithCamelTag() mismatch (-want +got):\n%s", diff) - } -} diff --git a/pkg/reconciler/integrationsource/resources/names.go b/pkg/reconciler/integration/source/resources/names.go similarity index 100% rename from pkg/reconciler/integrationsource/resources/names.go rename to pkg/reconciler/integration/source/resources/names.go diff --git a/pkg/reconciler/integrationsource/resources/containersource.go b/pkg/reconciler/integrationsource/resources/containersource.go deleted file mode 100644 index 8e55bcee889..00000000000 --- a/pkg/reconciler/integrationsource/resources/containersource.go +++ /dev/null @@ -1,237 +0,0 @@ -/* - Copyright 2024 The Knative Authors - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package resources - -import ( - "fmt" - "reflect" - "strconv" - "strings" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" - "knative.dev/eventing/pkg/apis/sources/v1alpha1" - "knative.dev/pkg/kmeta" -) - -const ( - awsAccessKey = "aws.accessKey" - awsSecretKey = "aws.secretKey" -) - -func NewContainerSource(source *v1alpha1.IntegrationSource) *sourcesv1.ContainerSource { - return &sourcesv1.ContainerSource{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: []metav1.OwnerReference{ - *kmeta.NewControllerRef(source), - }, - Name: ContainerSourceName(source), - Namespace: source.Namespace, - }, - Spec: sourcesv1.ContainerSourceSpec{ - - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "source", - Image: selectImage(source), - ImagePullPolicy: corev1.PullIfNotPresent, - Env: makeEnv(source), - }, - }, - }, - }, - SourceSpec: source.Spec.SourceSpec, - }, - } -} - -func generateEnvVarsFromStruct(prefix string, s interface{}) []corev1.EnvVar { - var envVars []corev1.EnvVar - - // Use reflection to inspect the struct fields - v := reflect.ValueOf(s) - if v.Kind() == reflect.Ptr { - v = v.Elem() - } - - t := v.Type() - - for i := 0; i < v.NumField(); i++ { - field := v.Field(i) - fieldType := t.Field(i) - - // Skip unexported fields - if !field.CanInterface() { - continue - } - - // Handle embedded/anonymous structs recursively - if fieldType.Anonymous && field.Kind() == reflect.Struct { - // Recursively handle embedded structs with the same prefix - envVars = append(envVars, generateEnvVarsFromStruct(prefix, field.Interface())...) - continue - } - - // First, check for the custom 'camel' tag - envVarName := fieldType.Tag.Get("camel") - if envVarName == "" { - // If 'camel' tag is not present, fall back to the 'json' tag or Go field name - jsonTag := fieldType.Tag.Get("json") - tagName := strings.Split(jsonTag, ",")[0] - if tagName == "" || tagName == "-" { - tagName = fieldType.Name - } - envVarName = fmt.Sprintf("%s_%s", prefix, strings.ToUpper(tagName)) - } - - if field.Kind() == reflect.Ptr { - if field.IsNil() { - continue - } - field = field.Elem() - } - - var value string - switch field.Kind() { - case reflect.Int, reflect.Int32, reflect.Int64: - value = strconv.FormatInt(field.Int(), 10) - case reflect.Bool: - value = strconv.FormatBool(field.Bool()) - case reflect.String: - value = field.String() - default: - // Skip unsupported types - continue - } - - // Skip zero/empty values - if value == "" { - continue - } - - envVars = append(envVars, corev1.EnvVar{ - Name: envVarName, - Value: value, - }) - } - - return envVars -} - -// Function to create environment variables for Timer or AWS configurations dynamically -func makeEnv(source *v1alpha1.IntegrationSource) []corev1.EnvVar { - var envVars = makeSSLEnvVar() - - // Timer environment variables - if source.Spec.Timer != nil { - envVars = append(envVars, generateEnvVarsFromStruct("CAMEL_KAMELET_TIMER_SOURCE", *source.Spec.Timer)...) - return envVars - } - - // Handle secret name only if AWS is configured - var secretName string - if source.Spec.Aws != nil && source.Spec.Aws.Auth != nil && source.Spec.Aws.Auth.Secret != nil && source.Spec.Aws.Auth.Secret.Ref != nil { - secretName = source.Spec.Aws.Auth.Secret.Ref.Name - } - - // AWS S3 environment variables - if source.Spec.Aws != nil && source.Spec.Aws.S3 != nil { - envVars = append(envVars, generateEnvVarsFromStruct("CAMEL_KAMELET_AWS_S3_SOURCE", *source.Spec.Aws.S3)...) - if secretName != "" { - envVars = append(envVars, []corev1.EnvVar{ - makeSecretEnvVar("CAMEL_KAMELET_AWS_S3_SOURCE_ACCESSKEY", awsAccessKey, secretName), - makeSecretEnvVar("CAMEL_KAMELET_AWS_S3_SOURCE_SECRETKEY", awsSecretKey, secretName), - }...) - } - return envVars - } - - // AWS SQS environment variables - if source.Spec.Aws != nil && source.Spec.Aws.SQS != nil { - envVars = append(envVars, generateEnvVarsFromStruct("CAMEL_KAMELET_AWS_SQS_SOURCE", *source.Spec.Aws.SQS)...) - if secretName != "" { - envVars = append(envVars, []corev1.EnvVar{ - makeSecretEnvVar("CAMEL_KAMELET_AWS_SQS_SOURCE_ACCESSKEY", awsAccessKey, secretName), - makeSecretEnvVar("CAMEL_KAMELET_AWS_SQS_SOURCE_SECRETKEY", awsSecretKey, secretName), - }...) - } - return envVars - } - - // AWS DynamoDB Streams environment variables - if source.Spec.Aws != nil && source.Spec.Aws.DDBStreams != nil { - envVars = append(envVars, generateEnvVarsFromStruct("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE", *source.Spec.Aws.DDBStreams)...) - if secretName != "" { - envVars = append(envVars, []corev1.EnvVar{ - makeSecretEnvVar("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE_ACCESSKEY", awsAccessKey, secretName), - makeSecretEnvVar("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE_SECRETKEY", awsSecretKey, secretName), - }...) - } - return envVars - } - - // If no valid configuration is found, return empty envVars - return envVars -} - -func makeSSLEnvVar() []corev1.EnvVar { - return []corev1.EnvVar{ - { - Name: "CAMEL_KNATIVE_CLIENT_SSL_ENABLED", - Value: "true", - }, - { - Name: "CAMEL_KNATIVE_CLIENT_SSL_CERT_PATH", - Value: "/knative-custom-certs/knative-eventing-bundle.pem", - }, - } -} - -func selectImage(source *v1alpha1.IntegrationSource) string { - if source.Spec.Timer != nil { - return "gcr.io/knative-nightly/timer-source:latest" - } - if source.Spec.Aws != nil { - if source.Spec.Aws.S3 != nil { - return "gcr.io/knative-nightly/aws-s3-source:latest" - } - if source.Spec.Aws.SQS != nil { - return "gcr.io/knative-nightly/aws-sqs-source:latest" - } - if source.Spec.Aws.DDBStreams != nil { - return "gcr.io/knative-nightly/aws-ddb-streams-source:latest" - } - } - return "" -} - -func makeSecretEnvVar(name, key, secretName string) corev1.EnvVar { - return corev1.EnvVar{ - Name: name, - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - Key: key, - LocalObjectReference: corev1.LocalObjectReference{ - Name: secretName, - }, - }, - }, - } -}