Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Allow consolidated channel to use distributed-style secret format #680

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
commonconfig "knative.dev/eventing-kafka/pkg/common/config"
"knative.dev/eventing-kafka/pkg/common/constants"
commonconstants "knative.dev/eventing-kafka/pkg/common/constants"
kafkasarama "knative.dev/eventing-kafka/pkg/common/kafka/sarama"
eventingclientset "knative.dev/eventing/pkg/client/clientset/versioned"
)

Expand Down Expand Up @@ -599,7 +600,7 @@ func (r *Reconciler) updateKafkaConfig(ctx context.Context, configMap *corev1.Co
}

logger.Info("Reloading Kafka configuration")
kafkaConfig, err := utils.GetKafkaConfig(ctx, controllerAgentName, configMap.Data, utils.GetKafkaAuthData)
kafkaConfig, err := utils.GetKafkaConfig(ctx, controllerAgentName, configMap.Data, kafkasarama.LoadAuthConfig)
if err != nil {
logger.Errorw("Error reading Kafka configuration", zap.Error(err))
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ import (
"go.uber.org/zap"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"

"knative.dev/eventing-kafka/pkg/common/configmaploader"
"knative.dev/eventing-kafka/pkg/common/constants"

"knative.dev/eventing/pkg/apis/eventing"
"knative.dev/eventing/pkg/channel/fanout"
"knative.dev/eventing/pkg/kncloudevents"
Expand All @@ -47,6 +43,9 @@ import (
"knative.dev/eventing-kafka/pkg/client/injection/informers/messaging/v1beta1/kafkachannel"
kafkachannelreconciler "knative.dev/eventing-kafka/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel"
listers "knative.dev/eventing-kafka/pkg/client/listers/messaging/v1beta1"
"knative.dev/eventing-kafka/pkg/common/configmaploader"
"knative.dev/eventing-kafka/pkg/common/constants"
kafkasarama "knative.dev/eventing-kafka/pkg/common/kafka/sarama"
)

const dispatcherClientId = "kafka-ch-dispatcher"
Expand Down Expand Up @@ -90,7 +89,7 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl
logger.Fatalw("error loading configuration", zap.Error(err))
}

kafkaConfig, err := utils.GetKafkaConfig(ctx, dispatcherClientId, configMap, utils.GetKafkaAuthData)
kafkaConfig, err := utils.GetKafkaConfig(ctx, dispatcherClientId, configMap, kafkasarama.LoadAuthConfig)
if err != nil {
logger.Fatalw("Error loading kafka config", zap.Error(err))
}
Expand Down
76 changes: 5 additions & 71 deletions pkg/channel/consolidated/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,16 @@ import (
"context"
"errors"
"fmt"
"strconv"
"strings"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/eventing-kafka/pkg/common/constants"
"knative.dev/pkg/configmap"
"knative.dev/pkg/logging"
"knative.dev/pkg/system"

"knative.dev/eventing-kafka/pkg/common/client"
"knative.dev/eventing-kafka/pkg/common/config"
"knative.dev/eventing-kafka/pkg/common/kafka/sarama"
kubeclient "knative.dev/pkg/client/injection/kube/client"
)

const (
Expand All @@ -44,14 +39,6 @@ const (
MaxIdleConnectionsKey = "maxIdleConns"
MaxIdleConnectionsPerHostKey = "maxIdleConnsPerHost"

TlsEnabled = "tls.enabled"
TlsCacert = "ca.crt"
TlsUsercert = "user.crt"
TlsUserkey = "user.key"
SaslUser = "user"
SaslPassword = "password"
SaslType = "saslType"

KafkaChannelSeparator = "."

knativeKafkaTopicPrefix = "knative-messaging-kafka"
Expand All @@ -62,64 +49,6 @@ type KafkaConfig struct {
EventingKafka *config.EventingKafkaConfig
}

func parseTls(secret *corev1.Secret, kafkaAuthConfig *client.KafkaAuthConfig) {

// self-signed CERTs we need CA CERT, USER CERT and KEy
if string(secret.Data[TlsCacert]) != "" {
// We have a self-signed TLS cert
tls := &client.KafkaTlsConfig{
Cacert: string(secret.Data[TlsCacert]),
Usercert: string(secret.Data[TlsUsercert]),
Userkey: string(secret.Data[TlsUserkey]),
}
kafkaAuthConfig.TLS = tls
} else {
// Public CERTS from a proper CA do not need this,
// we can just say `tls.enabled: true`
tlsEnabled, err := strconv.ParseBool(string(secret.Data[TlsEnabled]))
if err != nil {
tlsEnabled = false
}
if tlsEnabled {
// Looks like TLS is desired/enabled:
kafkaAuthConfig.TLS = &client.KafkaTlsConfig{}
}
}
}

func parseSasl(secret *corev1.Secret, kafkaAuthConfig *client.KafkaAuthConfig) {
if string(secret.Data[SaslUser]) != "" {
sasl := &client.KafkaSaslConfig{
User: string(secret.Data[SaslUser]),
Password: string(secret.Data[SaslPassword]),
SaslType: string(secret.Data[SaslType]),
}
kafkaAuthConfig.SASL = sasl
}
}

// GetKafkaAuthData reads auth information from the Secret and puts them into a KafkaAuthConfig struct
// GetKafkaAuthData returns a nil error in all cases because it matches the sarama.GetAuth prototype
// (so that it can be used in the sarama.LoadSettings call).
func GetKafkaAuthData(ctx context.Context, secretname string, secretNS string) (*client.KafkaAuthConfig, error) {

k8sClient := kubeclient.Get(ctx)
secret, err := k8sClient.CoreV1().Secrets(secretNS).Get(ctx, secretname, metav1.GetOptions{})

if err != nil || secret == nil {
logging.FromContext(ctx).Errorf("Referenced Auth Secret not found")
return nil, nil // For the consolidated channel type, the secret not existing is not an error
}

kafkaAuthConfig := &client.KafkaAuthConfig{}

// check for TLS and SASL options
parseTls(secret, kafkaAuthConfig)
parseSasl(secret, kafkaAuthConfig)

return kafkaAuthConfig, nil
}

// GetKafkaConfig returns the details of the Kafka cluster.
func GetKafkaConfig(ctx context.Context, clientId string, configMap map[string]string, getAuth sarama.GetAuth) (*KafkaConfig, error) {
if len(configMap) == 0 {
Expand Down Expand Up @@ -153,13 +82,18 @@ func GetKafkaConfig(ctx context.Context, clientId string, configMap map[string]s
configmap.AsInt(MaxIdleConnectionsKey, &eventingKafkaConfig.CloudEvents.MaxIdleConns),
configmap.AsInt(MaxIdleConnectionsPerHostKey, &eventingKafkaConfig.CloudEvents.MaxIdleConnsPerHost),
)
// Since LoadSettings isn't going to be called in this situation, we need to call getAuth explicitly
eventingKafkaConfig.Auth = getAuth(ctx, eventingKafkaConfig.Kafka.AuthSecretName, eventingKafkaConfig.Kafka.AuthSecretNamespace)
} else {
eventingKafkaConfig, err = sarama.LoadSettings(ctx, clientId, configMap, getAuth)
}
if err != nil {
return nil, err
}

// Enable Sarama logging if specified in the ConfigMap
sarama.EnableSaramaLogging(eventingKafkaConfig.Sarama.EnableLogging)

if eventingKafkaConfig.Kafka.Brokers == "" {
return nil, errors.New("missing or empty brokers in configuration")
}
Expand Down
42 changes: 25 additions & 17 deletions pkg/channel/consolidated/utils/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ package utils

import (
"context"
"testing"

"crypto/tls"
"crypto/x509"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand All @@ -38,6 +37,7 @@ import (

"knative.dev/eventing-kafka/pkg/common/client"
"knative.dev/eventing-kafka/pkg/common/config"
configtesting "knative.dev/eventing-kafka/pkg/common/config/testing"
"knative.dev/eventing-kafka/pkg/common/constants"
kafkasarama "knative.dev/eventing-kafka/pkg/common/kafka/sarama"
)
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestGenerateTopicNameWithHyphen(t *testing.T) {
}
}

func TestGetKafkaAuthData(t *testing.T) {
func TestGetKafkaConfig_BackwardsCompatibility(t *testing.T) {

api := &KubernetesAPI{
Client: fake.NewSimpleClientset(),
Expand All @@ -90,7 +90,7 @@ func TestGetKafkaAuthData(t *testing.T) {
corev1Input: api.Client.CoreV1(),
secretName: secretName,
secretNS: secretNamespace,
expected: &client.KafkaAuthConfig{},
expected: nil, // The Sarama builder expects a nil KafkaAuthConfig if no authentication is desired
},
"wrong secret name": {
corev1Input: api.Client.CoreV1(),
Expand Down Expand Up @@ -121,6 +121,7 @@ func TestGetKafkaAuthData(t *testing.T) {
SASL: &client.KafkaSaslConfig{
User: "user",
Password: "password",
SaslType: configtesting.DefaultSecretSaslType,
},
},
},
Expand Down Expand Up @@ -152,6 +153,7 @@ func TestGetKafkaAuthData(t *testing.T) {
SASL: &client.KafkaSaslConfig{
User: "user",
Password: "password",
SaslType: configtesting.DefaultSecretSaslType,
},
},
},
Expand Down Expand Up @@ -186,14 +188,18 @@ func TestGetKafkaAuthData(t *testing.T) {
assert.Nil(t, secretCreateError, "error creating secret resources")

ctx := context.WithValue(context.Background(), injectionclient.Key{}, fake.NewSimpleClientset(secret, namespace))
receivedSecret, err := GetKafkaAuthData(ctx, tc.secretName, tc.secretNS)

configMap := map[string]string{
"bootstrapServers": "kafkabroker.kafka:9092",
"authSecretName": tc.secretName,
"authSecretNamespace": tc.secretNS}
kafkaConfig, err := GetKafkaConfig(ctx, "test-client-id", configMap, kafkasarama.LoadAuthConfig)
assert.Nil(t, err)
if tc.expected == nil {
assert.Nil(t, receivedSecret)
assert.Nil(t, kafkaConfig.EventingKafka.Auth)
} else {
assert.Nil(t, err)
assert.Equal(t, tc.expected.TLS, receivedSecret.TLS)
assert.Equal(t, tc.expected.SASL, receivedSecret.SASL)
assert.Equal(t, tc.expected.TLS, kafkaConfig.EventingKafka.Auth.TLS)
assert.Equal(t, tc.expected.SASL, kafkaConfig.EventingKafka.Auth.SASL)
}

// clean up after test
Expand Down Expand Up @@ -365,7 +371,9 @@ func TestGetKafkaConfig(t *testing.T) {
EventingKafka: &config.EventingKafkaConfig{
Channel: config.EKChannelConfig{
Dispatcher: config.EKDispatcherConfig{EKKubernetesConfig: defaultK8SConfig},
Receiver: config.EKReceiverConfig{EKKubernetesConfig: defaultK8SConfig},
// The consolidated channel doesn't use the Receiver config, but there are default values in it
// (namely "Replicas") that make the zero-value of the Receiver struct invalid for comparisons
Receiver: config.EKReceiverConfig{EKKubernetesConfig: defaultK8SConfig},
},
CloudEvents: defaultCloudEvents,
Kafka: config.EKKafkaConfig{
Expand All @@ -386,7 +394,7 @@ func TestGetKafkaConfig(t *testing.T) {
t.Logf("Running %s", t.Name())

got, err := GetKafkaConfig(context.TODO(), "test-client-id", tc.data,
func(context.Context, string, string) (*client.KafkaAuthConfig, error) { return nil, nil })
func(context.Context, string, string) *client.KafkaAuthConfig { return nil })

if tc.getError != "" {
assert.NotNil(t, err)
Expand Down Expand Up @@ -475,12 +483,12 @@ func createSecret(cacert string, usercert string, userkey string, user string, p
Name: secretName,
},
Data: map[string][]byte{
TlsCacert: []byte(cacert),
TlsUsercert: []byte(usercert),
TlsUserkey: []byte(userkey),
TlsEnabled: []byte(tlsEnabled),
SaslUser: []byte(user),
SaslPassword: []byte(password),
config.TlsCacert: []byte(cacert),
config.TlsUsercert: []byte(usercert),
config.TlsUserkey: []byte(userkey),
config.TlsEnabled: []byte(tlsEnabled),
config.SaslUser: []byte(user),
config.SaslPassword: []byte(password),
},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import (
kafkachannelreconciler "knative.dev/eventing-kafka/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel"
commonconfig "knative.dev/eventing-kafka/pkg/common/config"
"knative.dev/eventing-kafka/pkg/common/constants"
kafkasarama "knative.dev/eventing-kafka/pkg/common/kafka/sarama"
commontesting "knative.dev/eventing-kafka/pkg/common/testing"
)

Expand Down Expand Up @@ -1100,7 +1099,6 @@ func TestReconciler_updateKafkaConfig(t *testing.T) {
{
name: "Empty Eventing-Kafka YAML (default for authSecretName)",
configMap: &corev1.ConfigMap{Data: map[string]string{constants.EventingKafkaSettingsConfigKey: ""}},
expectErr: `^could not load auth config: secrets "` + kafkasarama.DefaultAuthSecretName + `" not found$`,
},
{
name: "Invalid Eventing-Kafka YAML",
Expand Down
Loading