Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
Allow consolidated channel to use distributed-style secret format (#680)
Browse files Browse the repository at this point in the history
* Allow consolidated channel to use distributed-style secret format (with configmap)

* Added test coverage for moved code

* Didn't notice that the old secret was 'saslType' and not 'sasltype'

* spacing

* Add coverage

* Changes from PR review
  • Loading branch information
eric-sap authored Jun 4, 2021
1 parent 396db16 commit b516810
Show file tree
Hide file tree
Showing 9 changed files with 260 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
commonconfig "knative.dev/eventing-kafka/pkg/common/config"
"knative.dev/eventing-kafka/pkg/common/constants"
commonconstants "knative.dev/eventing-kafka/pkg/common/constants"
kafkasarama "knative.dev/eventing-kafka/pkg/common/kafka/sarama"
eventingclientset "knative.dev/eventing/pkg/client/clientset/versioned"
)

Expand Down Expand Up @@ -599,7 +600,7 @@ func (r *Reconciler) updateKafkaConfig(ctx context.Context, configMap *corev1.Co
}

logger.Info("Reloading Kafka configuration")
kafkaConfig, err := utils.GetKafkaConfig(ctx, controllerAgentName, configMap.Data, utils.GetKafkaAuthData)
kafkaConfig, err := utils.GetKafkaConfig(ctx, controllerAgentName, configMap.Data, kafkasarama.LoadAuthConfig)
if err != nil {
logger.Errorw("Error reading Kafka configuration", zap.Error(err))
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ import (
"go.uber.org/zap"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"

"knative.dev/eventing-kafka/pkg/common/configmaploader"
"knative.dev/eventing-kafka/pkg/common/constants"

"knative.dev/eventing/pkg/apis/eventing"
"knative.dev/eventing/pkg/channel/fanout"
"knative.dev/eventing/pkg/kncloudevents"
Expand All @@ -47,6 +43,9 @@ import (
"knative.dev/eventing-kafka/pkg/client/injection/informers/messaging/v1beta1/kafkachannel"
kafkachannelreconciler "knative.dev/eventing-kafka/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel"
listers "knative.dev/eventing-kafka/pkg/client/listers/messaging/v1beta1"
"knative.dev/eventing-kafka/pkg/common/configmaploader"
"knative.dev/eventing-kafka/pkg/common/constants"
kafkasarama "knative.dev/eventing-kafka/pkg/common/kafka/sarama"
)

const dispatcherClientId = "kafka-ch-dispatcher"
Expand Down Expand Up @@ -90,7 +89,7 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl
logger.Fatalw("error loading configuration", zap.Error(err))
}

kafkaConfig, err := utils.GetKafkaConfig(ctx, dispatcherClientId, configMap, utils.GetKafkaAuthData)
kafkaConfig, err := utils.GetKafkaConfig(ctx, dispatcherClientId, configMap, kafkasarama.LoadAuthConfig)
if err != nil {
logger.Fatalw("Error loading kafka config", zap.Error(err))
}
Expand Down
76 changes: 5 additions & 71 deletions pkg/channel/consolidated/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,16 @@ import (
"context"
"errors"
"fmt"
"strconv"
"strings"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/eventing-kafka/pkg/common/constants"
"knative.dev/pkg/configmap"
"knative.dev/pkg/logging"
"knative.dev/pkg/system"

"knative.dev/eventing-kafka/pkg/common/client"
"knative.dev/eventing-kafka/pkg/common/config"
"knative.dev/eventing-kafka/pkg/common/kafka/sarama"
kubeclient "knative.dev/pkg/client/injection/kube/client"
)

const (
Expand All @@ -44,14 +39,6 @@ const (
MaxIdleConnectionsKey = "maxIdleConns"
MaxIdleConnectionsPerHostKey = "maxIdleConnsPerHost"

TlsEnabled = "tls.enabled"
TlsCacert = "ca.crt"
TlsUsercert = "user.crt"
TlsUserkey = "user.key"
SaslUser = "user"
SaslPassword = "password"
SaslType = "saslType"

KafkaChannelSeparator = "."

knativeKafkaTopicPrefix = "knative-messaging-kafka"
Expand All @@ -62,64 +49,6 @@ type KafkaConfig struct {
EventingKafka *config.EventingKafkaConfig
}

func parseTls(secret *corev1.Secret, kafkaAuthConfig *client.KafkaAuthConfig) {

// self-signed CERTs we need CA CERT, USER CERT and KEy
if string(secret.Data[TlsCacert]) != "" {
// We have a self-signed TLS cert
tls := &client.KafkaTlsConfig{
Cacert: string(secret.Data[TlsCacert]),
Usercert: string(secret.Data[TlsUsercert]),
Userkey: string(secret.Data[TlsUserkey]),
}
kafkaAuthConfig.TLS = tls
} else {
// Public CERTS from a proper CA do not need this,
// we can just say `tls.enabled: true`
tlsEnabled, err := strconv.ParseBool(string(secret.Data[TlsEnabled]))
if err != nil {
tlsEnabled = false
}
if tlsEnabled {
// Looks like TLS is desired/enabled:
kafkaAuthConfig.TLS = &client.KafkaTlsConfig{}
}
}
}

func parseSasl(secret *corev1.Secret, kafkaAuthConfig *client.KafkaAuthConfig) {
if string(secret.Data[SaslUser]) != "" {
sasl := &client.KafkaSaslConfig{
User: string(secret.Data[SaslUser]),
Password: string(secret.Data[SaslPassword]),
SaslType: string(secret.Data[SaslType]),
}
kafkaAuthConfig.SASL = sasl
}
}

// GetKafkaAuthData reads auth information from the Secret and puts them into a KafkaAuthConfig struct
// GetKafkaAuthData returns a nil error in all cases because it matches the sarama.GetAuth prototype
// (so that it can be used in the sarama.LoadSettings call).
func GetKafkaAuthData(ctx context.Context, secretname string, secretNS string) (*client.KafkaAuthConfig, error) {

k8sClient := kubeclient.Get(ctx)
secret, err := k8sClient.CoreV1().Secrets(secretNS).Get(ctx, secretname, metav1.GetOptions{})

if err != nil || secret == nil {
logging.FromContext(ctx).Errorf("Referenced Auth Secret not found")
return nil, nil // For the consolidated channel type, the secret not existing is not an error
}

kafkaAuthConfig := &client.KafkaAuthConfig{}

// check for TLS and SASL options
parseTls(secret, kafkaAuthConfig)
parseSasl(secret, kafkaAuthConfig)

return kafkaAuthConfig, nil
}

// GetKafkaConfig returns the details of the Kafka cluster.
func GetKafkaConfig(ctx context.Context, clientId string, configMap map[string]string, getAuth sarama.GetAuth) (*KafkaConfig, error) {
if len(configMap) == 0 {
Expand Down Expand Up @@ -153,13 +82,18 @@ func GetKafkaConfig(ctx context.Context, clientId string, configMap map[string]s
configmap.AsInt(MaxIdleConnectionsKey, &eventingKafkaConfig.CloudEvents.MaxIdleConns),
configmap.AsInt(MaxIdleConnectionsPerHostKey, &eventingKafkaConfig.CloudEvents.MaxIdleConnsPerHost),
)
// Since LoadSettings isn't going to be called in this situation, we need to call getAuth explicitly
eventingKafkaConfig.Auth = getAuth(ctx, eventingKafkaConfig.Kafka.AuthSecretName, eventingKafkaConfig.Kafka.AuthSecretNamespace)
} else {
eventingKafkaConfig, err = sarama.LoadSettings(ctx, clientId, configMap, getAuth)
}
if err != nil {
return nil, err
}

// Enable Sarama logging if specified in the ConfigMap
sarama.EnableSaramaLogging(eventingKafkaConfig.Sarama.EnableLogging)

if eventingKafkaConfig.Kafka.Brokers == "" {
return nil, errors.New("missing or empty brokers in configuration")
}
Expand Down
42 changes: 25 additions & 17 deletions pkg/channel/consolidated/utils/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ package utils

import (
"context"
"testing"

"crypto/tls"
"crypto/x509"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand All @@ -38,6 +37,7 @@ import (

"knative.dev/eventing-kafka/pkg/common/client"
"knative.dev/eventing-kafka/pkg/common/config"
configtesting "knative.dev/eventing-kafka/pkg/common/config/testing"
"knative.dev/eventing-kafka/pkg/common/constants"
kafkasarama "knative.dev/eventing-kafka/pkg/common/kafka/sarama"
)
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestGenerateTopicNameWithHyphen(t *testing.T) {
}
}

func TestGetKafkaAuthData(t *testing.T) {
func TestGetKafkaConfig_BackwardsCompatibility(t *testing.T) {

api := &KubernetesAPI{
Client: fake.NewSimpleClientset(),
Expand All @@ -90,7 +90,7 @@ func TestGetKafkaAuthData(t *testing.T) {
corev1Input: api.Client.CoreV1(),
secretName: secretName,
secretNS: secretNamespace,
expected: &client.KafkaAuthConfig{},
expected: nil, // The Sarama builder expects a nil KafkaAuthConfig if no authentication is desired
},
"wrong secret name": {
corev1Input: api.Client.CoreV1(),
Expand Down Expand Up @@ -121,6 +121,7 @@ func TestGetKafkaAuthData(t *testing.T) {
SASL: &client.KafkaSaslConfig{
User: "user",
Password: "password",
SaslType: configtesting.DefaultSecretSaslType,
},
},
},
Expand Down Expand Up @@ -152,6 +153,7 @@ func TestGetKafkaAuthData(t *testing.T) {
SASL: &client.KafkaSaslConfig{
User: "user",
Password: "password",
SaslType: configtesting.DefaultSecretSaslType,
},
},
},
Expand Down Expand Up @@ -186,14 +188,18 @@ func TestGetKafkaAuthData(t *testing.T) {
assert.Nil(t, secretCreateError, "error creating secret resources")

ctx := context.WithValue(context.Background(), injectionclient.Key{}, fake.NewSimpleClientset(secret, namespace))
receivedSecret, err := GetKafkaAuthData(ctx, tc.secretName, tc.secretNS)

configMap := map[string]string{
"bootstrapServers": "kafkabroker.kafka:9092",
"authSecretName": tc.secretName,
"authSecretNamespace": tc.secretNS}
kafkaConfig, err := GetKafkaConfig(ctx, "test-client-id", configMap, kafkasarama.LoadAuthConfig)
assert.Nil(t, err)
if tc.expected == nil {
assert.Nil(t, receivedSecret)
assert.Nil(t, kafkaConfig.EventingKafka.Auth)
} else {
assert.Nil(t, err)
assert.Equal(t, tc.expected.TLS, receivedSecret.TLS)
assert.Equal(t, tc.expected.SASL, receivedSecret.SASL)
assert.Equal(t, tc.expected.TLS, kafkaConfig.EventingKafka.Auth.TLS)
assert.Equal(t, tc.expected.SASL, kafkaConfig.EventingKafka.Auth.SASL)
}

// clean up after test
Expand Down Expand Up @@ -365,7 +371,9 @@ func TestGetKafkaConfig(t *testing.T) {
EventingKafka: &config.EventingKafkaConfig{
Channel: config.EKChannelConfig{
Dispatcher: config.EKDispatcherConfig{EKKubernetesConfig: defaultK8SConfig},
Receiver: config.EKReceiverConfig{EKKubernetesConfig: defaultK8SConfig},
// The consolidated channel doesn't use the Receiver config, but there are default values in it
// (namely "Replicas") that make the zero-value of the Receiver struct invalid for comparisons
Receiver: config.EKReceiverConfig{EKKubernetesConfig: defaultK8SConfig},
},
CloudEvents: defaultCloudEvents,
Kafka: config.EKKafkaConfig{
Expand All @@ -386,7 +394,7 @@ func TestGetKafkaConfig(t *testing.T) {
t.Logf("Running %s", t.Name())

got, err := GetKafkaConfig(context.TODO(), "test-client-id", tc.data,
func(context.Context, string, string) (*client.KafkaAuthConfig, error) { return nil, nil })
func(context.Context, string, string) *client.KafkaAuthConfig { return nil })

if tc.getError != "" {
assert.NotNil(t, err)
Expand Down Expand Up @@ -475,12 +483,12 @@ func createSecret(cacert string, usercert string, userkey string, user string, p
Name: secretName,
},
Data: map[string][]byte{
TlsCacert: []byte(cacert),
TlsUsercert: []byte(usercert),
TlsUserkey: []byte(userkey),
TlsEnabled: []byte(tlsEnabled),
SaslUser: []byte(user),
SaslPassword: []byte(password),
config.TlsCacert: []byte(cacert),
config.TlsUsercert: []byte(usercert),
config.TlsUserkey: []byte(userkey),
config.TlsEnabled: []byte(tlsEnabled),
config.SaslUser: []byte(user),
config.SaslPassword: []byte(password),
},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import (
kafkachannelreconciler "knative.dev/eventing-kafka/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel"
commonconfig "knative.dev/eventing-kafka/pkg/common/config"
"knative.dev/eventing-kafka/pkg/common/constants"
kafkasarama "knative.dev/eventing-kafka/pkg/common/kafka/sarama"
commontesting "knative.dev/eventing-kafka/pkg/common/testing"
)

Expand Down Expand Up @@ -1100,7 +1099,6 @@ func TestReconciler_updateKafkaConfig(t *testing.T) {
{
name: "Empty Eventing-Kafka YAML (default for authSecretName)",
configMap: &corev1.ConfigMap{Data: map[string]string{constants.EventingKafkaSettingsConfigKey: ""}},
expectErr: `^could not load auth config: secrets "` + kafkasarama.DefaultAuthSecretName + `" not found$`,
},
{
name: "Invalid Eventing-Kafka YAML",
Expand Down
Loading

0 comments on commit b516810

Please sign in to comment.