Skip to content

Commit

Permalink
refactor apache kafka scaler config
Browse files Browse the repository at this point in the history
Signed-off-by: Jan Wozniak <wozniak.jan@gmail.com>
  • Loading branch information
wozniakjan committed May 15, 2024
1 parent a7e999c commit 02fd305
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 194 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **General**: Declarative parsing of scaler config ([#5037](https://github.com/kedacore/keda/issues/5037))
- **Apache Kafka Scaler**: Refactor config parser ([#5797](https://github.com/kedacore/keda/issues/5797))

#### Experimental

Expand Down
228 changes: 63 additions & 165 deletions pkg/scalers/apache_kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"crypto/tls"
"errors"
"fmt"
"strconv"
"strings"

"github.com/go-logr/logr"
Expand All @@ -49,20 +48,20 @@ type apacheKafkaScaler struct {
}

type apacheKafkaMetadata struct {
bootstrapServers []string
group string
topic []string
partitionLimitation []int32
lagThreshold int64
activationLagThreshold int64
offsetResetPolicy offsetResetPolicy
allowIdleConsumers bool
excludePersistentLag bool
BootstrapServers []string `keda:"name=bootstrapServers, order=triggerMetadata;resolvedEnv"`
Group string `keda:"name=consumerGroup, order=triggerMetadata;resolvedEnv"`
Topic []string `keda:"name=topic, order=triggerMetadata;resolvedEnv, optional"`
PartitionLimitation []int `keda:"name=partitionLimitation, order=triggerMetadata, optional, rangeSeparator=-"`
LagThreshold int64 `keda:"name=lagThreshold, order=triggerMetadata, default=10"`
ActivationLagThreshold int64 `keda:"name=activationLagThreshold, order=triggerMetadata, default=0"`
OffsetResetPolicy offsetResetPolicy `keda:"name=offsetResetPolicy, order=triggerMetadata, enum=earliest;latest, default=latest"`
AllowIdleConsumers bool `keda:"name=allowIdleConsumers, order=triggerMetadata, optional"`
ExcludePersistentLag bool `keda:"name=excludePersistentLag, order=triggerMetadata, optional"`

// If an invalid offset is found, whether to scale to 1 (false - the default) so consumption can
// occur or scale to 0 (true). See discussion in https://github.com/kedacore/keda/issues/2612
scaleToZeroOnInvalidOffset bool
limitToPartitionsWithLag bool
ScaleToZeroOnInvalidOffset bool `keda:"name=scaleToZeroOnInvalidOffset, order=triggerMetadata, optional"`
LimitToPartitionsWithLag bool `keda:"name=limitToPartitionsWithLag, order=triggerMetadata, optional"`

// SASL
saslType kafkaSaslType
Expand All @@ -84,6 +83,26 @@ type apacheKafkaMetadata struct {
triggerIndex int
}

func (a *apacheKafkaMetadata) Validate() error {
if a.LagThreshold <= 0 {
return fmt.Errorf("lagThreshold must be a positive number")
}
if a.ActivationLagThreshold < 0 {
return fmt.Errorf("activationLagThreshold must be a positive number")
}
if a.AllowIdleConsumers && a.LimitToPartitionsWithLag {
return fmt.Errorf("allowIdleConsumers and limitToPartitionsWithLag cannot be set simultaneously")
}
if len(a.Topic) == 0 && a.LimitToPartitionsWithLag {
return fmt.Errorf("topic must be specified when using limitToPartitionsWithLag")
}
if len(a.Topic) == 0 && len(a.PartitionLimitation) > 0 {
// no specific topics set, ignoring partitionLimitation setting
a.PartitionLimitation = nil
}
return nil
}

const (
KafkaSASLTypeMskIam = "aws_msk_iam"
)
Expand Down Expand Up @@ -229,136 +248,15 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache
}

func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (apacheKafkaMetadata, error) {
meta := apacheKafkaMetadata{}
switch {
case config.TriggerMetadata["bootstrapServersFromEnv"] != "":
meta.bootstrapServers = strings.Split(config.ResolvedEnv[config.TriggerMetadata["bootstrapServersFromEnv"]], ",")
case config.TriggerMetadata["bootstrapServers"] != "":
meta.bootstrapServers = strings.Split(config.TriggerMetadata["bootstrapServers"], ",")
default:
return meta, errors.New("no bootstrapServers given")
}

switch {
case config.TriggerMetadata["consumerGroupFromEnv"] != "":
meta.group = config.ResolvedEnv[config.TriggerMetadata["consumerGroupFromEnv"]]
case config.TriggerMetadata["consumerGroup"] != "":
meta.group = config.TriggerMetadata["consumerGroup"]
default:
return meta, errors.New("no consumer group given")
}

switch {
case config.TriggerMetadata["topicFromEnv"] != "":
meta.topic = strings.Split(config.ResolvedEnv[config.TriggerMetadata["topicFromEnv"]], ",")
case config.TriggerMetadata["topic"] != "":
meta.topic = strings.Split(config.TriggerMetadata["topic"], ",")
default:
meta.topic = []string{}
logger.V(1).Info(fmt.Sprintf("consumer group %q has no topics specified, "+
"will use all topics subscribed by the consumer group for scaling", meta.group))
}

meta.partitionLimitation = nil
partitionLimitationMetadata := strings.TrimSpace(config.TriggerMetadata["partitionLimitation"])
if partitionLimitationMetadata != "" {
if meta.topic == nil || len(meta.topic) == 0 {
logger.V(1).Info("no specific topics set, ignoring partitionLimitation setting")
} else {
pattern := config.TriggerMetadata["partitionLimitation"]
parsed, err := kedautil.ParseInt32List(pattern)
if err != nil {
return meta, fmt.Errorf("error parsing in partitionLimitation '%s': %w", pattern, err)
}
meta.partitionLimitation = parsed
logger.V(0).Info(fmt.Sprintf("partition limit active '%s'", pattern))
}
}

meta.offsetResetPolicy = defaultOffsetResetPolicy

if config.TriggerMetadata["offsetResetPolicy"] != "" {
policy := offsetResetPolicy(config.TriggerMetadata["offsetResetPolicy"])
if policy != earliest && policy != latest {
return meta, fmt.Errorf("err offsetResetPolicy policy %q given", policy)
}
meta.offsetResetPolicy = policy
}

meta.lagThreshold = defaultKafkaLagThreshold

if val, ok := config.TriggerMetadata[lagThresholdMetricName]; ok {
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return meta, fmt.Errorf("error parsing %q: %w", lagThresholdMetricName, err)
}
if t <= 0 {
return meta, fmt.Errorf("%q must be positive number", lagThresholdMetricName)
}
meta.lagThreshold = t
}

meta.activationLagThreshold = defaultKafkaActivationLagThreshold

if val, ok := config.TriggerMetadata[activationLagThresholdMetricName]; ok {
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return meta, fmt.Errorf("error parsing %q: %w", activationLagThresholdMetricName, err)
}
if t < 0 {
return meta, fmt.Errorf("%q must be positive number", activationLagThresholdMetricName)
}
meta.activationLagThreshold = t
meta := apacheKafkaMetadata{triggerIndex: config.TriggerIndex}
if err := config.TypedConfig(&meta); err != nil {
return meta, fmt.Errorf("error parsing kafka metadata: %w", err)
}

if err := parseApacheKafkaAuthParams(config, &meta); err != nil {
return meta, err
}

meta.allowIdleConsumers = false
if val, ok := config.TriggerMetadata["allowIdleConsumers"]; ok {
t, err := strconv.ParseBool(val)
if err != nil {
return meta, fmt.Errorf("error parsing allowIdleConsumers: %w", err)
}
meta.allowIdleConsumers = t
}

meta.excludePersistentLag = false
if val, ok := config.TriggerMetadata["excludePersistentLag"]; ok {
t, err := strconv.ParseBool(val)
if err != nil {
return meta, fmt.Errorf("error parsing excludePersistentLag: %w", err)
}
meta.excludePersistentLag = t
}

meta.scaleToZeroOnInvalidOffset = false
if val, ok := config.TriggerMetadata["scaleToZeroOnInvalidOffset"]; ok {
t, err := strconv.ParseBool(val)
if err != nil {
return meta, fmt.Errorf("error parsing scaleToZeroOnInvalidOffset: %w", err)
}
meta.scaleToZeroOnInvalidOffset = t
}

meta.limitToPartitionsWithLag = false
if val, ok := config.TriggerMetadata["limitToPartitionsWithLag"]; ok {
t, err := strconv.ParseBool(val)
if err != nil {
return meta, fmt.Errorf("error parsing limitToPartitionsWithLag: %w", err)
}
meta.limitToPartitionsWithLag = t

if meta.allowIdleConsumers && meta.limitToPartitionsWithLag {
return meta, fmt.Errorf("allowIdleConsumers and limitToPartitionsWithLag cannot be set simultaneously")
}
if len(meta.topic) == 0 && meta.limitToPartitionsWithLag {
return meta, fmt.Errorf("topic must be specified when using limitToPartitionsWithLag")
}
}

meta.triggerIndex = config.TriggerIndex
return meta, nil
}

Expand Down Expand Up @@ -411,7 +309,7 @@ func getApacheKafkaClient(ctx context.Context, metadata apacheKafkaMetadata, log
SASL: saslMechanism,
}
client := kafka.Client{
Addr: kafka.TCP(metadata.bootstrapServers...),
Addr: kafka.TCP(metadata.BootstrapServers...),
Transport: transport,
}
if err != nil {
Expand All @@ -430,30 +328,30 @@ func (s *apacheKafkaScaler) getTopicPartitions(ctx context.Context) (map[string]
}
s.logger.V(1).Info(fmt.Sprintf("Listed topics %v", metadata.Topics))

if len(s.metadata.topic) == 0 {
if len(s.metadata.Topic) == 0 {
// in case of empty topic name, we will get all topics that the consumer group is subscribed to
describeGrpReq := &kafka.DescribeGroupsRequest{
Addr: s.client.Addr,
GroupIDs: []string{
s.metadata.group,
s.metadata.Group,
},
}
describeGrp, err := s.client.DescribeGroups(ctx, describeGrpReq)
if err != nil {
return nil, fmt.Errorf("error describing group: %w", err)
}
if len(describeGrp.Groups[0].Members) == 0 {
return nil, fmt.Errorf("no active members in group %s, group-state is %s", s.metadata.group, describeGrp.Groups[0].GroupState)
return nil, fmt.Errorf("no active members in group %s, group-state is %s", s.metadata.Group, describeGrp.Groups[0].GroupState)
}
s.logger.V(4).Info(fmt.Sprintf("Described group %s with response %v", s.metadata.group, describeGrp))
s.logger.V(4).Info(fmt.Sprintf("Described group %s with response %v", s.metadata.Group, describeGrp))

result := make(map[string][]int)
for _, topic := range metadata.Topics {
partitions := make([]int, 0)
for _, partition := range topic.Partitions {
// if no partitions limitatitions are specified, all partitions are considered
if (len(s.metadata.partitionLimitation) == 0) ||
(len(s.metadata.partitionLimitation) > 0 && kedautil.Contains(s.metadata.partitionLimitation, int32(partition.ID))) {
if (len(s.metadata.PartitionLimitation) == 0) ||
(len(s.metadata.PartitionLimitation) > 0 && kedautil.Contains(s.metadata.PartitionLimitation, partition.ID)) {
partitions = append(partitions, partition.ID)
}
}
Expand All @@ -464,10 +362,10 @@ func (s *apacheKafkaScaler) getTopicPartitions(ctx context.Context) (map[string]
result := make(map[string][]int)
for _, topic := range metadata.Topics {
partitions := make([]int, 0)
if kedautil.Contains(s.metadata.topic, topic.Name) {
if kedautil.Contains(s.metadata.Topic, topic.Name) {
for _, partition := range topic.Partitions {
if (len(s.metadata.partitionLimitation) == 0) ||
(len(s.metadata.partitionLimitation) > 0 && kedautil.Contains(s.metadata.partitionLimitation, int32(partition.ID))) {
if (len(s.metadata.PartitionLimitation) == 0) ||
(len(s.metadata.PartitionLimitation) > 0 && kedautil.Contains(s.metadata.PartitionLimitation, partition.ID)) {
partitions = append(partitions, partition.ID)
}
}
Expand All @@ -481,7 +379,7 @@ func (s *apacheKafkaScaler) getConsumerOffsets(ctx context.Context, topicPartiti
response, err := s.client.OffsetFetch(
ctx,
&kafka.OffsetFetchRequest{
GroupID: s.metadata.group,
GroupID: s.metadata.Group,
Topics: topicPartitions,
},
)
Expand Down Expand Up @@ -514,14 +412,14 @@ func (s *apacheKafkaScaler) getLagForPartition(topic string, partitionID int, co
}

consumerOffset := consumerOffsets[topic][partitionID]
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest {
if consumerOffset == invalidOffset && s.metadata.OffsetResetPolicy == latest {
retVal := int64(1)
if s.metadata.scaleToZeroOnInvalidOffset {
if s.metadata.ScaleToZeroOnInvalidOffset {
retVal = 0
}
msg := fmt.Sprintf(
"invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet. Returning with lag of %d",
topic, s.metadata.group, partitionID, retVal)
topic, s.metadata.Group, partitionID, retVal)
s.logger.V(1).Info(msg)
return retVal, retVal, nil
}
Expand All @@ -530,15 +428,15 @@ func (s *apacheKafkaScaler) getLagForPartition(topic string, partitionID int, co
return 0, 0, fmt.Errorf("error finding partition offset for topic %s", topic)
}
producerOffset := producerOffsets[topic][partitionID]
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest {
if s.metadata.scaleToZeroOnInvalidOffset {
if consumerOffset == invalidOffset && s.metadata.OffsetResetPolicy == earliest {
if s.metadata.ScaleToZeroOnInvalidOffset {
return 0, 0, nil
}
return producerOffset, producerOffset, nil
}

// This code block tries to prevent KEDA Kafka trigger from scaling the scale target based on erroneous events
if s.metadata.excludePersistentLag {
if s.metadata.ExcludePersistentLag {
switch previousOffset, found := s.previousOffsets[topic][partitionID]; {
case !found:
// No record of previous offset, so store current consumer offset
Expand All @@ -558,8 +456,8 @@ func (s *apacheKafkaScaler) getLagForPartition(topic string, partitionID int, co
}
}

s.logger.V(4).Info(fmt.Sprintf("Consumer offset for topic %s in group %s and partition %d is %d", topic, s.metadata.group, partitionID, consumerOffset))
s.logger.V(4).Info(fmt.Sprintf("Producer offset for topic %s in group %s and partition %d is %d", topic, s.metadata.group, partitionID, producerOffset))
s.logger.V(4).Info(fmt.Sprintf("Consumer offset for topic %s in group %s and partition %d is %d", topic, s.metadata.Group, partitionID, consumerOffset))
s.logger.V(4).Info(fmt.Sprintf("Producer offset for topic %s in group %s and partition %d is %d", topic, s.metadata.Group, partitionID, producerOffset))

return producerOffset - consumerOffset, producerOffset - consumerOffset, nil
}
Expand All @@ -578,17 +476,17 @@ func (s *apacheKafkaScaler) Close(context.Context) error {

func (s *apacheKafkaScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
var metricName string
if s.metadata.topic != nil && len(s.metadata.topic) > 0 {
metricName = fmt.Sprintf("kafka-%s", strings.Join(s.metadata.topic, ","))
if s.metadata.Topic != nil && len(s.metadata.Topic) > 0 {
metricName = fmt.Sprintf("kafka-%s", strings.Join(s.metadata.Topic, ","))
} else {
metricName = fmt.Sprintf("kafka-%s-topics", s.metadata.group)
metricName = fmt.Sprintf("kafka-%s-topics", s.metadata.Group)
}

externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(metricName)),
},
Target: GetMetricTarget(s.metricType, s.metadata.lagThreshold),
Target: GetMetricTarget(s.metricType, s.metadata.LagThreshold),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: kafkaMetricType}
return []v2.MetricSpec{metricSpec}
Expand Down Expand Up @@ -639,7 +537,7 @@ func (s *apacheKafkaScaler) GetMetricsAndActivity(ctx context.Context, metricNam
}
metric := GenerateMetricInMili(metricName, float64(totalLag))

return []external_metrics.ExternalMetricValue{metric}, totalLagWithPersistent > s.metadata.activationLagThreshold, nil
return []external_metrics.ExternalMetricValue{metric}, totalLagWithPersistent > s.metadata.ActivationLagThreshold, nil
}

// getTotalLag returns totalLag, totalLagWithPersistent, error
Expand Down Expand Up @@ -678,19 +576,19 @@ func (s *apacheKafkaScaler) getTotalLag(ctx context.Context) (int64, int64, erro
}
totalTopicPartitions += (int64)(len(partitionsOffsets))
}
s.logger.V(1).Info(fmt.Sprintf("Kafka scaler: Providing metrics based on totalLag %v, topicPartitions %v, threshold %v", totalLag, topicPartitions, s.metadata.lagThreshold))
s.logger.V(1).Info(fmt.Sprintf("Kafka scaler: Providing metrics based on totalLag %v, topicPartitions %v, threshold %v", totalLag, topicPartitions, s.metadata.LagThreshold))

s.logger.V(1).Info(fmt.Sprintf("Kafka scaler: Consumer offsets %v, producer offsets %v", consumerOffsets, producerOffsets))

if !s.metadata.allowIdleConsumers || s.metadata.limitToPartitionsWithLag {
if !s.metadata.AllowIdleConsumers || s.metadata.LimitToPartitionsWithLag {
// don't scale out beyond the number of topicPartitions or partitionsWithLag depending on settings
upperBound := totalTopicPartitions
if s.metadata.limitToPartitionsWithLag {
if s.metadata.LimitToPartitionsWithLag {
upperBound = partitionsWithLag
}

if (totalLag / s.metadata.lagThreshold) > upperBound {
totalLag = upperBound * s.metadata.lagThreshold
if (totalLag / s.metadata.LagThreshold) > upperBound {
totalLag = upperBound * s.metadata.LagThreshold
}
}
return totalLag, totalLagWithPersistent, nil
Expand Down
Loading

0 comments on commit 02fd305

Please sign in to comment.