From fff7eacc76d64053f9bafb8c83697b4276944513 Mon Sep 17 00:00:00 2001 From: Jan Wozniak Date: Wed, 29 May 2024 14:08:35 +0200 Subject: [PATCH] Refactor Apache Kafka scaler config (#5804) Signed-off-by: Jan Wozniak Signed-off-by: Ranjith Gopal --- pkg/scalers/apache_kafka_scaler.go | 408 +++++------------- pkg/scalers/apache_kafka_scaler_test.go | 118 ++--- pkg/scalers/scalersconfig/typed_config.go | 94 +++- .../scalersconfig/typed_config_test.go | 32 ++ 4 files changed, 284 insertions(+), 368 deletions(-) diff --git a/pkg/scalers/apache_kafka_scaler.go b/pkg/scalers/apache_kafka_scaler.go index 890cc7d61cf..d2b0d3f51c9 100644 --- a/pkg/scalers/apache_kafka_scaler.go +++ b/pkg/scalers/apache_kafka_scaler.go @@ -23,7 +23,6 @@ import ( "crypto/tls" "errors" "fmt" - "strconv" "strings" "github.com/go-logr/logr" @@ -49,41 +48,81 @@ type apacheKafkaScaler struct { } type apacheKafkaMetadata struct { - bootstrapServers []string - group string - topic []string - partitionLimitation []int32 - lagThreshold int64 - activationLagThreshold int64 - offsetResetPolicy offsetResetPolicy - allowIdleConsumers bool - excludePersistentLag bool + BootstrapServers []string `keda:"name=bootstrapServers, order=triggerMetadata;resolvedEnv"` + Group string `keda:"name=consumerGroup, order=triggerMetadata;resolvedEnv"` + Topic []string `keda:"name=topic, order=triggerMetadata;resolvedEnv, optional"` + PartitionLimitation []int `keda:"name=partitionLimitation, order=triggerMetadata, optional, range"` + LagThreshold int64 `keda:"name=lagThreshold, order=triggerMetadata, default=10"` + ActivationLagThreshold int64 `keda:"name=activationLagThreshold, order=triggerMetadata, default=0"` + OffsetResetPolicy offsetResetPolicy `keda:"name=offsetResetPolicy, order=triggerMetadata, enum=earliest;latest, default=latest"` + AllowIdleConsumers bool `keda:"name=allowIdleConsumers, order=triggerMetadata, optional"` + ExcludePersistentLag bool `keda:"name=excludePersistentLag, order=triggerMetadata, optional"` // If an invalid offset is found, whether to scale to 1 (false - the default) so consumption can // occur or scale to 0 (true). See discussion in https://github.com/kedacore/keda/issues/2612 - scaleToZeroOnInvalidOffset bool - limitToPartitionsWithLag bool + ScaleToZeroOnInvalidOffset bool `keda:"name=scaleToZeroOnInvalidOffset, order=triggerMetadata, optional"` + LimitToPartitionsWithLag bool `keda:"name=limitToPartitionsWithLag, order=triggerMetadata, optional"` // SASL - saslType kafkaSaslType - username string - password string + SASLType kafkaSaslType `keda:"name=sasl, order=triggerMetadata;authParams, enum=none;plaintext;scram_sha256;scram_sha512;gssapi;aws_msk_iam, default=none"` + Username string `keda:"name=username, order=authParams, optional"` + Password string `keda:"name=password, order=authParams, optional"` // MSK - awsRegion string - awsEndpoint string - awsAuthorization awsutils.AuthorizationMetadata + AWSRegion string `keda:"name=awsRegion, order=triggerMetadata, optional"` + AWSEndpoint string `keda:"name=awsEndpoint, order=triggerMetadata, optional"` + AWSAuthorization awsutils.AuthorizationMetadata // TLS - enableTLS bool - cert string - key string - keyPassword string - ca string + TLS string `keda:"name=tls, order=triggerMetadata;authParams, enum=enable;disable, default=disable"` + Cert string `keda:"name=cert, order=authParams, optional"` + Key string `keda:"name=key, order=authParams, optional"` + KeyPassword string `keda:"name=keyPassword, order=authParams, optional"` + CA string `keda:"name=ca, order=authParams, optional"` triggerIndex int } +func (a *apacheKafkaMetadata) enableTLS() bool { + return a.TLS == stringEnable +} + +func (a *apacheKafkaMetadata) Validate() error { + if a.LagThreshold <= 0 { + return fmt.Errorf("lagThreshold must be a positive number") + } + if a.ActivationLagThreshold < 0 { + return fmt.Errorf("activationLagThreshold must be a positive number") + } + if a.AllowIdleConsumers && a.LimitToPartitionsWithLag { + return fmt.Errorf("allowIdleConsumers and limitToPartitionsWithLag cannot be set simultaneously") + } + if len(a.Topic) == 0 && a.LimitToPartitionsWithLag { + return fmt.Errorf("topic must be specified when using limitToPartitionsWithLag") + } + if len(a.Topic) == 0 && len(a.PartitionLimitation) > 0 { + // no specific topics set, ignoring partitionLimitation setting + a.PartitionLimitation = nil + } + if a.enableTLS() && ((a.Cert == "") != (a.Key == "")) { + return fmt.Errorf("can't set only one of cert or key when using TLS") + } + switch a.SASLType { + case KafkaSASLTypePlaintext: + if a.Username == "" || a.Password == "" { + return fmt.Errorf("username and password must be set when using SASL/PLAINTEXT") + } + case KafkaSASLTypeMskIam: + if a.AWSRegion == "" { + return fmt.Errorf("awsRegion must be set when using AWS MSK IAM") + } + if !a.enableTLS() { + return fmt.Errorf("TLS must be enabled when using AWS MSK IAM") + } + } + return nil +} + const ( KafkaSASLTypeMskIam = "aws_msk_iam" ) @@ -95,13 +134,12 @@ func NewApacheKafkaScaler(ctx context.Context, config *scalersconfig.ScalerConfi return nil, fmt.Errorf("error getting scaler metric type: %w", err) } - logger := InitializeLogger(config, "apache_kafka_scaler") - - kafkaMetadata, err := parseApacheKafkaMetadata(config, logger) + kafkaMetadata, err := parseApacheKafkaMetadata(config) if err != nil { return nil, fmt.Errorf("error parsing kafka metadata: %w", err) } + logger := InitializeLogger(config, "apache_kafka_scaler") client, err := getApacheKafkaClient(ctx, kafkaMetadata, logger) if err != nil { return nil, err @@ -119,246 +157,32 @@ func NewApacheKafkaScaler(ctx context.Context, config *scalersconfig.ScalerConfi } func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apacheKafkaMetadata) error { - meta.enableTLS = false - enableTLS := false - if val, ok := config.TriggerMetadata["tls"]; ok { - switch val { - case stringEnable: - enableTLS = true - case stringDisable: - enableTLS = false - default: - return fmt.Errorf("error incorrect TLS value given, got %s", val) - } - } - - if val, ok := config.AuthParams["tls"]; ok { - val = strings.TrimSpace(val) - if enableTLS { - return errors.New("unable to set `tls` in both ScaledObject and TriggerAuthentication together") - } - switch val { - case stringEnable: - enableTLS = true - case stringDisable: - enableTLS = false - default: - return fmt.Errorf("error incorrect TLS value given, got %s", val) - } - } - - if enableTLS { - certGiven := config.AuthParams["cert"] != "" - keyGiven := config.AuthParams["key"] != "" - if certGiven && !keyGiven { - return errors.New("key must be provided with cert") - } - if keyGiven && !certGiven { - return errors.New("cert must be provided with key") - } - meta.ca = config.AuthParams["ca"] - meta.cert = config.AuthParams["cert"] - meta.key = config.AuthParams["key"] - if value, found := config.AuthParams["keyPassword"]; found { - meta.keyPassword = value - } else { - meta.keyPassword = "" - } - meta.enableTLS = true - } - - meta.saslType = KafkaSASLTypeNone - var saslAuthType string - switch { - case config.TriggerMetadata["sasl"] != "": - saslAuthType = config.TriggerMetadata["sasl"] - default: - saslAuthType = "" + if config.TriggerMetadata["sasl"] != "" && config.AuthParams["sasl"] != "" { + return errors.New("unable to set `sasl` in both ScaledObject and TriggerAuthentication together") } - if val, ok := config.AuthParams["sasl"]; ok { - if saslAuthType != "" { - return errors.New("unable to set `sasl` in both ScaledObject and TriggerAuthentication together") - } - saslAuthType = val + if config.TriggerMetadata["tls"] != "" && config.AuthParams["tls"] != "" { + return errors.New("unable to set `tls` in both ScaledObject and TriggerAuthentication together") } - - if saslAuthType != "" { - saslAuthType = strings.TrimSpace(saslAuthType) - switch mode := kafkaSaslType(saslAuthType); mode { - case KafkaSASLTypeMskIam: - meta.saslType = mode - if val, ok := config.TriggerMetadata["awsEndpoint"]; ok { - meta.awsEndpoint = val - } - if !meta.enableTLS { - return errors.New("TLS is required for MSK") - } - if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" { - meta.awsRegion = val - } else { - return errors.New("no awsRegion given") - } - auth, err := awsutils.GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv) - if err != nil { - return err - } - meta.awsAuthorization = auth - case KafkaSASLTypePlaintext: - fallthrough - case KafkaSASLTypeSCRAMSHA256: - fallthrough - case KafkaSASLTypeSCRAMSHA512: - if val, ok := config.AuthParams["username"]; ok { - meta.username = strings.TrimSpace(val) - } else { - return errors.New("no username given") - } - if val, ok := config.AuthParams["password"]; ok { - meta.password = strings.TrimSpace(val) - } else { - return errors.New("no password given") - } - case KafkaSASLTypeOAuthbearer: - return errors.New("SASL/OAUTHBEARER is not implemented yet") - default: - return fmt.Errorf("err sasl type %q given", mode) + if meta.SASLType == KafkaSASLTypeMskIam { + auth, err := awsutils.GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv) + if err != nil { + return err } + meta.AWSAuthorization = auth } - return nil } -func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (apacheKafkaMetadata, error) { - meta := apacheKafkaMetadata{} - switch { - case config.TriggerMetadata["bootstrapServersFromEnv"] != "": - meta.bootstrapServers = strings.Split(config.ResolvedEnv[config.TriggerMetadata["bootstrapServersFromEnv"]], ",") - case config.TriggerMetadata["bootstrapServers"] != "": - meta.bootstrapServers = strings.Split(config.TriggerMetadata["bootstrapServers"], ",") - default: - return meta, errors.New("no bootstrapServers given") - } - - switch { - case config.TriggerMetadata["consumerGroupFromEnv"] != "": - meta.group = config.ResolvedEnv[config.TriggerMetadata["consumerGroupFromEnv"]] - case config.TriggerMetadata["consumerGroup"] != "": - meta.group = config.TriggerMetadata["consumerGroup"] - default: - return meta, errors.New("no consumer group given") - } - - switch { - case config.TriggerMetadata["topicFromEnv"] != "": - meta.topic = strings.Split(config.ResolvedEnv[config.TriggerMetadata["topicFromEnv"]], ",") - case config.TriggerMetadata["topic"] != "": - meta.topic = strings.Split(config.TriggerMetadata["topic"], ",") - default: - meta.topic = []string{} - logger.V(1).Info(fmt.Sprintf("consumer group %q has no topics specified, "+ - "will use all topics subscribed by the consumer group for scaling", meta.group)) - } - - meta.partitionLimitation = nil - partitionLimitationMetadata := strings.TrimSpace(config.TriggerMetadata["partitionLimitation"]) - if partitionLimitationMetadata != "" { - if meta.topic == nil || len(meta.topic) == 0 { - logger.V(1).Info("no specific topics set, ignoring partitionLimitation setting") - } else { - pattern := config.TriggerMetadata["partitionLimitation"] - parsed, err := kedautil.ParseInt32List(pattern) - if err != nil { - return meta, fmt.Errorf("error parsing in partitionLimitation '%s': %w", pattern, err) - } - meta.partitionLimitation = parsed - logger.V(0).Info(fmt.Sprintf("partition limit active '%s'", pattern)) - } - } - - meta.offsetResetPolicy = defaultOffsetResetPolicy - - if config.TriggerMetadata["offsetResetPolicy"] != "" { - policy := offsetResetPolicy(config.TriggerMetadata["offsetResetPolicy"]) - if policy != earliest && policy != latest { - return meta, fmt.Errorf("err offsetResetPolicy policy %q given", policy) - } - meta.offsetResetPolicy = policy - } - - meta.lagThreshold = defaultKafkaLagThreshold - - if val, ok := config.TriggerMetadata[lagThresholdMetricName]; ok { - t, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return meta, fmt.Errorf("error parsing %q: %w", lagThresholdMetricName, err) - } - if t <= 0 { - return meta, fmt.Errorf("%q must be positive number", lagThresholdMetricName) - } - meta.lagThreshold = t - } - - meta.activationLagThreshold = defaultKafkaActivationLagThreshold - - if val, ok := config.TriggerMetadata[activationLagThresholdMetricName]; ok { - t, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return meta, fmt.Errorf("error parsing %q: %w", activationLagThresholdMetricName, err) - } - if t < 0 { - return meta, fmt.Errorf("%q must be positive number", activationLagThresholdMetricName) - } - meta.activationLagThreshold = t +func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig) (apacheKafkaMetadata, error) { + meta := apacheKafkaMetadata{triggerIndex: config.TriggerIndex} + if err := config.TypedConfig(&meta); err != nil { + return meta, fmt.Errorf("error parsing kafka metadata: %w", err) } if err := parseApacheKafkaAuthParams(config, &meta); err != nil { return meta, err } - meta.allowIdleConsumers = false - if val, ok := config.TriggerMetadata["allowIdleConsumers"]; ok { - t, err := strconv.ParseBool(val) - if err != nil { - return meta, fmt.Errorf("error parsing allowIdleConsumers: %w", err) - } - meta.allowIdleConsumers = t - } - - meta.excludePersistentLag = false - if val, ok := config.TriggerMetadata["excludePersistentLag"]; ok { - t, err := strconv.ParseBool(val) - if err != nil { - return meta, fmt.Errorf("error parsing excludePersistentLag: %w", err) - } - meta.excludePersistentLag = t - } - - meta.scaleToZeroOnInvalidOffset = false - if val, ok := config.TriggerMetadata["scaleToZeroOnInvalidOffset"]; ok { - t, err := strconv.ParseBool(val) - if err != nil { - return meta, fmt.Errorf("error parsing scaleToZeroOnInvalidOffset: %w", err) - } - meta.scaleToZeroOnInvalidOffset = t - } - - meta.limitToPartitionsWithLag = false - if val, ok := config.TriggerMetadata["limitToPartitionsWithLag"]; ok { - t, err := strconv.ParseBool(val) - if err != nil { - return meta, fmt.Errorf("error parsing limitToPartitionsWithLag: %w", err) - } - meta.limitToPartitionsWithLag = t - - if meta.allowIdleConsumers && meta.limitToPartitionsWithLag { - return meta, fmt.Errorf("allowIdleConsumers and limitToPartitionsWithLag cannot be set simultaneously") - } - if len(meta.topic) == 0 && meta.limitToPartitionsWithLag { - return meta, fmt.Errorf("topic must be specified when using limitToPartitionsWithLag") - } - } - - meta.triggerIndex = config.TriggerIndex return meta, nil } @@ -367,43 +191,43 @@ func getApacheKafkaClient(ctx context.Context, metadata apacheKafkaMetadata, log var tlsConfig *tls.Config var err error - logger.V(4).Info(fmt.Sprintf("Kafka SASL type %s", metadata.saslType)) - if metadata.enableTLS { - tlsConfig, err = kedautil.NewTLSConfigWithPassword(metadata.cert, metadata.key, metadata.keyPassword, metadata.ca, false) + logger.V(4).Info(fmt.Sprintf("Kafka SASL type %s", metadata.SASLType)) + if metadata.enableTLS() { + tlsConfig, err = kedautil.NewTLSConfigWithPassword(metadata.Cert, metadata.Key, metadata.KeyPassword, metadata.CA, false) if err != nil { return nil, err } } - switch metadata.saslType { + switch metadata.SASLType { case KafkaSASLTypeNone: saslMechanism = nil case KafkaSASLTypePlaintext: saslMechanism = plain.Mechanism{ - Username: metadata.username, - Password: metadata.password, + Username: metadata.Username, + Password: metadata.Password, } case KafkaSASLTypeSCRAMSHA256: - saslMechanism, err = scram.Mechanism(scram.SHA256, metadata.username, metadata.password) + saslMechanism, err = scram.Mechanism(scram.SHA256, metadata.Username, metadata.Password) if err != nil { return nil, err } case KafkaSASLTypeSCRAMSHA512: - saslMechanism, err = scram.Mechanism(scram.SHA512, metadata.username, metadata.password) + saslMechanism, err = scram.Mechanism(scram.SHA512, metadata.Username, metadata.Password) if err != nil { return nil, err } case KafkaSASLTypeOAuthbearer: return nil, errors.New("SASL/OAUTHBEARER is not implemented yet") case KafkaSASLTypeMskIam: - cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsRegion, metadata.awsAuthorization) + cfg, err := awsutils.GetAwsConfig(ctx, metadata.AWSRegion, metadata.AWSAuthorization) if err != nil { return nil, err } saslMechanism = aws_msk_iam_v2.NewMechanism(*cfg) default: - return nil, fmt.Errorf("err sasl type %q given", metadata.saslType) + return nil, fmt.Errorf("err sasl type %q given", metadata.SASLType) } transport := &kafka.Transport{ @@ -411,7 +235,7 @@ func getApacheKafkaClient(ctx context.Context, metadata apacheKafkaMetadata, log SASL: saslMechanism, } client := kafka.Client{ - Addr: kafka.TCP(metadata.bootstrapServers...), + Addr: kafka.TCP(metadata.BootstrapServers...), Transport: transport, } if err != nil { @@ -430,12 +254,12 @@ func (s *apacheKafkaScaler) getTopicPartitions(ctx context.Context) (map[string] } s.logger.V(1).Info(fmt.Sprintf("Listed topics %v", metadata.Topics)) - if len(s.metadata.topic) == 0 { + if len(s.metadata.Topic) == 0 { // in case of empty topic name, we will get all topics that the consumer group is subscribed to describeGrpReq := &kafka.DescribeGroupsRequest{ Addr: s.client.Addr, GroupIDs: []string{ - s.metadata.group, + s.metadata.Group, }, } describeGrp, err := s.client.DescribeGroups(ctx, describeGrpReq) @@ -443,17 +267,17 @@ func (s *apacheKafkaScaler) getTopicPartitions(ctx context.Context) (map[string] return nil, fmt.Errorf("error describing group: %w", err) } if len(describeGrp.Groups[0].Members) == 0 { - return nil, fmt.Errorf("no active members in group %s, group-state is %s", s.metadata.group, describeGrp.Groups[0].GroupState) + return nil, fmt.Errorf("no active members in group %s, group-state is %s", s.metadata.Group, describeGrp.Groups[0].GroupState) } - s.logger.V(4).Info(fmt.Sprintf("Described group %s with response %v", s.metadata.group, describeGrp)) + s.logger.V(4).Info(fmt.Sprintf("Described group %s with response %v", s.metadata.Group, describeGrp)) result := make(map[string][]int) for _, topic := range metadata.Topics { partitions := make([]int, 0) for _, partition := range topic.Partitions { // if no partitions limitatitions are specified, all partitions are considered - if (len(s.metadata.partitionLimitation) == 0) || - (len(s.metadata.partitionLimitation) > 0 && kedautil.Contains(s.metadata.partitionLimitation, int32(partition.ID))) { + if (len(s.metadata.PartitionLimitation) == 0) || + (len(s.metadata.PartitionLimitation) > 0 && kedautil.Contains(s.metadata.PartitionLimitation, partition.ID)) { partitions = append(partitions, partition.ID) } } @@ -464,10 +288,10 @@ func (s *apacheKafkaScaler) getTopicPartitions(ctx context.Context) (map[string] result := make(map[string][]int) for _, topic := range metadata.Topics { partitions := make([]int, 0) - if kedautil.Contains(s.metadata.topic, topic.Name) { + if kedautil.Contains(s.metadata.Topic, topic.Name) { for _, partition := range topic.Partitions { - if (len(s.metadata.partitionLimitation) == 0) || - (len(s.metadata.partitionLimitation) > 0 && kedautil.Contains(s.metadata.partitionLimitation, int32(partition.ID))) { + if (len(s.metadata.PartitionLimitation) == 0) || + (len(s.metadata.PartitionLimitation) > 0 && kedautil.Contains(s.metadata.PartitionLimitation, partition.ID)) { partitions = append(partitions, partition.ID) } } @@ -481,7 +305,7 @@ func (s *apacheKafkaScaler) getConsumerOffsets(ctx context.Context, topicPartiti response, err := s.client.OffsetFetch( ctx, &kafka.OffsetFetchRequest{ - GroupID: s.metadata.group, + GroupID: s.metadata.Group, Topics: topicPartitions, }, ) @@ -514,14 +338,14 @@ func (s *apacheKafkaScaler) getLagForPartition(topic string, partitionID int, co } consumerOffset := consumerOffsets[topic][partitionID] - if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest { + if consumerOffset == invalidOffset && s.metadata.OffsetResetPolicy == latest { retVal := int64(1) - if s.metadata.scaleToZeroOnInvalidOffset { + if s.metadata.ScaleToZeroOnInvalidOffset { retVal = 0 } msg := fmt.Sprintf( "invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet. Returning with lag of %d", - topic, s.metadata.group, partitionID, retVal) + topic, s.metadata.Group, partitionID, retVal) s.logger.V(1).Info(msg) return retVal, retVal, nil } @@ -530,15 +354,15 @@ func (s *apacheKafkaScaler) getLagForPartition(topic string, partitionID int, co return 0, 0, fmt.Errorf("error finding partition offset for topic %s", topic) } producerOffset := producerOffsets[topic][partitionID] - if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest { - if s.metadata.scaleToZeroOnInvalidOffset { + if consumerOffset == invalidOffset && s.metadata.OffsetResetPolicy == earliest { + if s.metadata.ScaleToZeroOnInvalidOffset { return 0, 0, nil } return producerOffset, producerOffset, nil } // This code block tries to prevent KEDA Kafka trigger from scaling the scale target based on erroneous events - if s.metadata.excludePersistentLag { + if s.metadata.ExcludePersistentLag { switch previousOffset, found := s.previousOffsets[topic][partitionID]; { case !found: // No record of previous offset, so store current consumer offset @@ -558,8 +382,8 @@ func (s *apacheKafkaScaler) getLagForPartition(topic string, partitionID int, co } } - s.logger.V(4).Info(fmt.Sprintf("Consumer offset for topic %s in group %s and partition %d is %d", topic, s.metadata.group, partitionID, consumerOffset)) - s.logger.V(4).Info(fmt.Sprintf("Producer offset for topic %s in group %s and partition %d is %d", topic, s.metadata.group, partitionID, producerOffset)) + s.logger.V(4).Info(fmt.Sprintf("Consumer offset for topic %s in group %s and partition %d is %d", topic, s.metadata.Group, partitionID, consumerOffset)) + s.logger.V(4).Info(fmt.Sprintf("Producer offset for topic %s in group %s and partition %d is %d", topic, s.metadata.Group, partitionID, producerOffset)) return producerOffset - consumerOffset, producerOffset - consumerOffset, nil } @@ -578,17 +402,17 @@ func (s *apacheKafkaScaler) Close(context.Context) error { func (s *apacheKafkaScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { var metricName string - if s.metadata.topic != nil && len(s.metadata.topic) > 0 { - metricName = fmt.Sprintf("kafka-%s", strings.Join(s.metadata.topic, ",")) + if s.metadata.Topic != nil && len(s.metadata.Topic) > 0 { + metricName = fmt.Sprintf("kafka-%s", strings.Join(s.metadata.Topic, ",")) } else { - metricName = fmt.Sprintf("kafka-%s-topics", s.metadata.group) + metricName = fmt.Sprintf("kafka-%s-topics", s.metadata.Group) } externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(metricName)), }, - Target: GetMetricTarget(s.metricType, s.metadata.lagThreshold), + Target: GetMetricTarget(s.metricType, s.metadata.LagThreshold), } metricSpec := v2.MetricSpec{External: externalMetric, Type: kafkaMetricType} return []v2.MetricSpec{metricSpec} @@ -639,7 +463,7 @@ func (s *apacheKafkaScaler) GetMetricsAndActivity(ctx context.Context, metricNam } metric := GenerateMetricInMili(metricName, float64(totalLag)) - return []external_metrics.ExternalMetricValue{metric}, totalLagWithPersistent > s.metadata.activationLagThreshold, nil + return []external_metrics.ExternalMetricValue{metric}, totalLagWithPersistent > s.metadata.ActivationLagThreshold, nil } // getTotalLag returns totalLag, totalLagWithPersistent, error @@ -678,19 +502,19 @@ func (s *apacheKafkaScaler) getTotalLag(ctx context.Context) (int64, int64, erro } totalTopicPartitions += (int64)(len(partitionsOffsets)) } - s.logger.V(1).Info(fmt.Sprintf("Kafka scaler: Providing metrics based on totalLag %v, topicPartitions %v, threshold %v", totalLag, topicPartitions, s.metadata.lagThreshold)) + s.logger.V(1).Info(fmt.Sprintf("Kafka scaler: Providing metrics based on totalLag %v, topicPartitions %v, threshold %v", totalLag, topicPartitions, s.metadata.LagThreshold)) s.logger.V(1).Info(fmt.Sprintf("Kafka scaler: Consumer offsets %v, producer offsets %v", consumerOffsets, producerOffsets)) - if !s.metadata.allowIdleConsumers || s.metadata.limitToPartitionsWithLag { + if !s.metadata.AllowIdleConsumers || s.metadata.LimitToPartitionsWithLag { // don't scale out beyond the number of topicPartitions or partitionsWithLag depending on settings upperBound := totalTopicPartitions - if s.metadata.limitToPartitionsWithLag { + if s.metadata.LimitToPartitionsWithLag { upperBound = partitionsWithLag } - if (totalLag / s.metadata.lagThreshold) > upperBound { - totalLag = upperBound * s.metadata.lagThreshold + if (totalLag / s.metadata.LagThreshold) > upperBound { + totalLag = upperBound * s.metadata.LagThreshold } } return totalLag, totalLagWithPersistent, nil diff --git a/pkg/scalers/apache_kafka_scaler_test.go b/pkg/scalers/apache_kafka_scaler_test.go index 11c2864cf34..511077d22e6 100644 --- a/pkg/scalers/apache_kafka_scaler_test.go +++ b/pkg/scalers/apache_kafka_scaler_test.go @@ -19,7 +19,7 @@ type parseApacheKafkaMetadataTestData struct { brokers []string group string topic []string - partitionLimitation []int32 + partitionLimitation []int offsetResetPolicy offsetResetPolicy allowIdleConsumers bool excludePersistentLag bool @@ -68,13 +68,13 @@ var parseApacheKafkaMetadataTestDataset = []parseApacheKafkaMetadataTestData{ // failure, no consumer group {map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", nil, nil, "latest", false, false, false}, // success, no topics - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", nil, nil, offsetResetPolicy("latest"), false, false, false}, // success, ignore partitionLimitation if no topics - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": "1,2,3,4,5,6"}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": "1,2,3,4,5,6"}, false, 1, []string{"foobar:9092"}, "my-group", nil, nil, offsetResetPolicy("latest"), false, false, false}, // success, no limitation with whitespaced limitation value - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": " "}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": " "}, false, 1, []string{"foobar:9092"}, "my-group", nil, nil, offsetResetPolicy("latest"), false, false, false}, // success, no limitation - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": ""}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": ""}, false, 1, []string{"foobar:9092"}, "my-group", nil, nil, offsetResetPolicy("latest"), false, false, false}, // failure, lagThreshold is negative value {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false}, // failure, lagThreshold is 0 @@ -86,11 +86,11 @@ var parseApacheKafkaMetadataTestDataset = []parseApacheKafkaMetadataTestData{ // success {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false}, // success, partitionLimitation as list - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1,2,3,4"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1,2,3,4"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false, false}, // success, partitionLimitation as range - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1-4"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1-4"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false, false}, // success, partitionLimitation mixed list + ranges - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1-4,8,10-12"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int32{1, 2, 3, 4, 8, 10, 11, 12}, offsetResetPolicy("latest"), false, false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1-4,8,10-12"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int{1, 2, 3, 4, 8, 10, 11, 12}, offsetResetPolicy("latest"), false, false, false}, // failure, partitionLimitation wrong data type {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "a,b,c,d"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false}, // success, more brokers @@ -120,7 +120,7 @@ var parseApacheKafkaMetadataTestDataset = []parseApacheKafkaMetadataTestData{ // success, allowIdleConsumers can be set when limitToPartitionsWithLag is false {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "allowIdleConsumers": "true", "limitToPartitionsWithLag": "false"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), true, false, false}, // failure, topic must be specified when limitToPartitionsWithLag is true - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "limitToPartitionsWithLag": "true"}, true, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, true}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "limitToPartitionsWithLag": "true"}, true, 1, []string{"foobar:9092"}, "my-group", nil, nil, offsetResetPolicy("latest"), false, false, true}, } var parseApacheKafkaAuthParamsTestDataset = []parseApacheKafkaAuthParamsTestData{ @@ -243,10 +243,10 @@ var apacheKafkaMetricIdentifiers = []apacheKafkaMetricIdentifier{ func TestApacheKafkaGetBrokers(t *testing.T) { for _, testData := range parseApacheKafkaMetadataTestDataset { - meta, err := parseApacheKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validApacheKafkaWithAuthParams}, logr.Discard()) + meta, err := parseApacheKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validApacheKafkaWithAuthParams}) getBrokerApacheKafkaTestBase(t, meta, testData, err) - meta, err = parseApacheKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validApacheKafkaWithoutAuthParams}, logr.Discard()) + meta, err = parseApacheKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validApacheKafkaWithoutAuthParams}) getBrokerApacheKafkaTestBase(t, meta, testData, err) } } @@ -258,32 +258,32 @@ func getBrokerApacheKafkaTestBase(t *testing.T, meta apacheKafkaMetadata, testDa if testData.isError && err == nil { t.Error("Expected error but got success") } - if len(meta.bootstrapServers) != testData.numBrokers { - t.Errorf("Expected %d bootstrap servers but got %d\n", testData.numBrokers, len(meta.bootstrapServers)) + if len(meta.BootstrapServers) != testData.numBrokers { + t.Errorf("Expected %d bootstrap servers but got %d\n", testData.numBrokers, len(meta.BootstrapServers)) } - if !reflect.DeepEqual(testData.brokers, meta.bootstrapServers) { - t.Errorf("Expected %#v but got %#v\n", testData.brokers, meta.bootstrapServers) + if !reflect.DeepEqual(testData.brokers, meta.BootstrapServers) { + t.Errorf("Expected %#v but got %#v\n", testData.brokers, meta.BootstrapServers) } - if meta.group != testData.group { - t.Errorf("Expected group %s but got %s\n", testData.group, meta.group) + if meta.Group != testData.group { + t.Errorf("Expected group %s but got %s\n", testData.group, meta.Group) } - if !reflect.DeepEqual(testData.topic, meta.topic) { - t.Errorf("Expected topics %#v but got %#v\n", testData.topic, meta.topic) + if !reflect.DeepEqual(testData.topic, meta.Topic) { + t.Errorf("Expected topics %#v but got %#v\n", testData.topic, meta.Topic) } - if !reflect.DeepEqual(testData.partitionLimitation, meta.partitionLimitation) { - t.Errorf("Expected %#v but got %#v\n", testData.partitionLimitation, meta.partitionLimitation) + if !reflect.DeepEqual(testData.partitionLimitation, meta.PartitionLimitation) { + t.Errorf("Expected %#v but got %#v\n", testData.partitionLimitation, meta.PartitionLimitation) } - if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy { - t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy) + if err == nil && meta.OffsetResetPolicy != testData.offsetResetPolicy { + t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.OffsetResetPolicy) } - if err == nil && meta.allowIdleConsumers != testData.allowIdleConsumers { - t.Errorf("Expected allowIdleConsumers %t but got %t\n", testData.allowIdleConsumers, meta.allowIdleConsumers) + if err == nil && meta.AllowIdleConsumers != testData.allowIdleConsumers { + t.Errorf("Expected allowIdleConsumers %t but got %t\n", testData.allowIdleConsumers, meta.AllowIdleConsumers) } - if err == nil && meta.excludePersistentLag != testData.excludePersistentLag { - t.Errorf("Expected excludePersistentLag %t but got %t\n", testData.excludePersistentLag, meta.excludePersistentLag) + if err == nil && meta.ExcludePersistentLag != testData.excludePersistentLag { + t.Errorf("Expected excludePersistentLag %t but got %t\n", testData.excludePersistentLag, meta.ExcludePersistentLag) } - if err == nil && meta.limitToPartitionsWithLag != testData.limitToPartitionsWithLag { - t.Errorf("Expected limitToPartitionsWithLag %t but got %t\n", testData.limitToPartitionsWithLag, meta.limitToPartitionsWithLag) + if err == nil && meta.LimitToPartitionsWithLag != testData.limitToPartitionsWithLag { + t.Errorf("Expected limitToPartitionsWithLag %t but got %t\n", testData.limitToPartitionsWithLag, meta.LimitToPartitionsWithLag) } expectedLagThreshold, er := parseExpectedLagThreshold(testData.metadata) @@ -291,44 +291,44 @@ func getBrokerApacheKafkaTestBase(t *testing.T, meta apacheKafkaMetadata, testDa t.Errorf("Unable to convert test data lagThreshold %s to string", testData.metadata["lagThreshold"]) } - if meta.lagThreshold != expectedLagThreshold && meta.lagThreshold != defaultKafkaLagThreshold { - t.Errorf("Expected lagThreshold to be either %v or %v got %v ", meta.lagThreshold, defaultKafkaLagThreshold, expectedLagThreshold) + if meta.LagThreshold != expectedLagThreshold && meta.LagThreshold != defaultKafkaLagThreshold { + t.Errorf("Expected lagThreshold to be either %v or %v got %v ", meta.LagThreshold, defaultKafkaLagThreshold, expectedLagThreshold) } } func TestApacheKafkaAuthParams(t *testing.T) { // Testing tls and sasl value in TriggerAuthentication - for _, testData := range parseApacheKafkaAuthParamsTestDataset { - meta, err := parseApacheKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: validApacheKafkaMetadata, AuthParams: testData.authParams}, logr.Discard()) + for i, testData := range parseApacheKafkaAuthParamsTestDataset { + meta, err := parseApacheKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: validApacheKafkaMetadata, AuthParams: testData.authParams}) if err != nil && !testData.isError { - t.Error("Expected success but got error", err) + t.Error(i, "Expected success but got error", err) } if testData.isError && err == nil { - t.Error("Expected error but got success") + t.Error(i, "Expected error but got success") } // we can ignore what tls is set if there is error - if err == nil && meta.enableTLS != testData.enableTLS { - t.Errorf("Expected enableTLS to be set to %#v but got %#v\n", testData.enableTLS, meta.enableTLS) + if err == nil && meta.enableTLS() != testData.enableTLS { + t.Errorf("%v Expected enableTLS to be set to %#v but got %#v\n", i, testData.enableTLS, meta.enableTLS()) } - if err == nil && meta.enableTLS { - if meta.ca != testData.authParams["ca"] { - t.Errorf("Expected ca to be set to %#v but got %#v\n", testData.authParams["ca"], meta.ca) + if err == nil && meta.enableTLS() { + if meta.CA != testData.authParams["ca"] { + t.Errorf("%v Expected ca to be set to %#v but got %#v\n", i, testData.authParams["ca"], meta.CA) } - if meta.cert != testData.authParams["cert"] { - t.Errorf("Expected cert to be set to %#v but got %#v\n", testData.authParams["cert"], meta.cert) + if meta.Cert != testData.authParams["cert"] { + t.Errorf("%v Expected cert to be set to %#v but got %#v\n", i, testData.authParams["cert"], meta.Cert) } - if meta.key != testData.authParams["key"] { - t.Errorf("Expected key to be set to %#v but got %#v\n", testData.authParams["key"], meta.key) + if meta.Key != testData.authParams["key"] { + t.Errorf("%v Expected key to be set to %#v but got %#v\n", i, testData.authParams["key"], meta.Key) } - if meta.keyPassword != testData.authParams["keyPassword"] { - t.Errorf("Expected key to be set to %#v but got %#v\n", testData.authParams["keyPassword"], meta.key) + if meta.KeyPassword != testData.authParams["keyPassword"] { + t.Errorf("%v Expected key to be set to %#v but got %#v\n", i, testData.authParams["keyPassword"], meta.Key) } } } // Testing tls and sasl value in scaledObject for id, testData := range parseApacheKafkaAuthParamsTestDataset2 { - meta, err := parseApacheKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}, logr.Discard()) + meta, err := parseApacheKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}) if err != nil && !testData.isError { t.Errorf("Test case: %#v. Expected success but got error %#v", id, err) @@ -337,21 +337,21 @@ func TestApacheKafkaAuthParams(t *testing.T) { t.Errorf("Test case: %#v. Expected error but got success", id) } if !testData.isError { - if testData.metadata["tls"] == stringTrue && !meta.enableTLS { - t.Errorf("Test case: %#v. Expected tls to be set to %#v but got %#v\n", id, testData.metadata["tls"], meta.enableTLS) + if testData.metadata["tls"] == stringTrue && !meta.enableTLS() { + t.Errorf("Test case: %#v. Expected tls to be set to %#v but got %#v\n", id, testData.metadata["tls"], meta.enableTLS()) } - if meta.enableTLS { - if meta.ca != testData.authParams["ca"] { - t.Errorf("Test case: %#v. Expected ca to be set to %#v but got %#v\n", id, testData.authParams["ca"], meta.ca) + if meta.enableTLS() { + if meta.CA != testData.authParams["ca"] { + t.Errorf("Test case: %#v. Expected ca to be set to %#v but got %#v\n", id, testData.authParams["ca"], meta.CA) } - if meta.cert != testData.authParams["cert"] { - t.Errorf("Test case: %#v. Expected cert to be set to %#v but got %#v\n", id, testData.authParams["cert"], meta.cert) + if meta.Cert != testData.authParams["cert"] { + t.Errorf("Test case: %#v. Expected cert to be set to %#v but got %#v\n", id, testData.authParams["cert"], meta.Cert) } - if meta.key != testData.authParams["key"] { - t.Errorf("Test case: %#v. Expected key to be set to %#v but got %#v\n", id, testData.authParams["key"], meta.key) + if meta.Key != testData.authParams["key"] { + t.Errorf("Test case: %#v. Expected key to be set to %#v but got %#v\n", id, testData.authParams["key"], meta.Key) } - if meta.keyPassword != testData.authParams["keyPassword"] { - t.Errorf("Test case: %#v. Expected key to be set to %#v but got %#v\n", id, testData.authParams["keyPassword"], meta.keyPassword) + if meta.KeyPassword != testData.authParams["keyPassword"] { + t.Errorf("Test case: %#v. Expected key to be set to %#v but got %#v\n", id, testData.authParams["keyPassword"], meta.KeyPassword) } } } @@ -360,7 +360,7 @@ func TestApacheKafkaAuthParams(t *testing.T) { func TestApacheKafkaGetMetricSpecForScaling(t *testing.T) { for _, testData := range apacheKafkaMetricIdentifiers { - meta, err := parseApacheKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: validApacheKafkaWithAuthParams, TriggerIndex: testData.triggerIndex}, logr.Discard()) + meta, err := parseApacheKafkaMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: validApacheKafkaWithAuthParams, TriggerIndex: testData.triggerIndex}) if err != nil { t.Fatal("Could not parse metadata:", err) } diff --git a/pkg/scalers/scalersconfig/typed_config.go b/pkg/scalers/scalersconfig/typed_config.go index a47f136f855..b06e1478b88 100644 --- a/pkg/scalers/scalersconfig/typed_config.go +++ b/pkg/scalers/scalersconfig/typed_config.go @@ -75,6 +75,7 @@ const ( nameTag = "name" enumTag = "enum" exclusiveSetTag = "exclusiveSet" + rangeTag = "range" ) // Params is a struct that represents the parameter list that can be used in the keda tag @@ -105,6 +106,9 @@ type Params struct { // ExclusiveSet is the 'exclusiveSet' tag parameter defining the list of values that are mutually exclusive ExclusiveSet []string + + // RangeSeparator is the 'range' tag parameter defining the separator for range values + RangeSeparator string } // IsNested is a function that returns true if the parameter is nested @@ -134,7 +138,7 @@ func (sc *ScalerConfig) TypedConfig(typedConfig any) (err error) { // this shouldn't happen, but calling certain reflection functions may result in panic // if it does, it's better to return a error with stacktrace and reject parsing config // rather than crashing KEDA - err = fmt.Errorf("failed to parse typed config %T resulted in panic\n%v", r, debug.Stack()) + err = fmt.Errorf("failed to parse typed config %T resulted in panic\n%v", r, string(debug.Stack())) } }() err = sc.parseTypedConfig(typedConfig, false) @@ -242,14 +246,14 @@ func (sc *ScalerConfig) setValue(field reflect.Value, params Params) error { } return sc.parseTypedConfig(field.Addr().Interface(), params.Optional) } - if err := setConfigValueHelper(valFromConfig, field); err != nil { + if err := setConfigValueHelper(params, valFromConfig, field); err != nil { return fmt.Errorf("unable to set param %q value %q: %w", params.Name, valFromConfig, err) } return nil } // setConfigValueURLParams is a function that sets the value of the url.Values field -func setConfigValueURLParams(valFromConfig string, field reflect.Value) error { +func setConfigValueURLParams(params Params, valFromConfig string, field reflect.Value) error { field.Set(reflect.MakeMap(reflect.MapOf(field.Type().Key(), field.Type().Elem()))) vals, err := url.ParseQuery(valFromConfig) if err != nil { @@ -258,7 +262,7 @@ func setConfigValueURLParams(valFromConfig string, field reflect.Value) error { for k, vs := range vals { ifcMapKeyElem := reflect.New(field.Type().Key()).Elem() ifcMapValueElem := reflect.New(field.Type().Elem()).Elem() - if err := setConfigValueHelper(k, ifcMapKeyElem); err != nil { + if err := setConfigValueHelper(params, k, ifcMapKeyElem); err != nil { return fmt.Errorf("map key %q: %w", k, err) } for _, v := range vs { @@ -270,7 +274,7 @@ func setConfigValueURLParams(valFromConfig string, field reflect.Value) error { } // setConfigValueMap is a function that sets the value of the map field -func setConfigValueMap(valFromConfig string, field reflect.Value) error { +func setConfigValueMap(params Params, valFromConfig string, field reflect.Value) error { field.Set(reflect.MakeMap(reflect.MapOf(field.Type().Key(), field.Type().Elem()))) split := strings.Split(valFromConfig, elemSeparator) for _, s := range split { @@ -282,11 +286,11 @@ func setConfigValueMap(valFromConfig string, field reflect.Value) error { key := strings.TrimSpace(kv[0]) val := strings.TrimSpace(kv[1]) ifcKeyElem := reflect.New(field.Type().Key()).Elem() - if err := setConfigValueHelper(key, ifcKeyElem); err != nil { + if err := setConfigValueHelper(params, key, ifcKeyElem); err != nil { return fmt.Errorf("map key %q: %w", key, err) } ifcValueElem := reflect.New(field.Type().Elem()).Elem() - if err := setConfigValueHelper(val, ifcValueElem); err != nil { + if err := setConfigValueHelper(params, val, ifcValueElem); err != nil { return fmt.Errorf("map key %q, value %q: %w", key, val, err) } field.SetMapIndex(ifcKeyElem, ifcValueElem) @@ -294,22 +298,69 @@ func setConfigValueMap(valFromConfig string, field reflect.Value) error { return nil } +// canRange is a function that checks if the value can be ranged +func canRange(valFromConfig, elemRangeSeparator string, field reflect.Value) bool { + if elemRangeSeparator == "" { + return false + } + if field.Kind() != reflect.Slice { + return false + } + elemIfc := reflect.New(field.Type().Elem()).Interface() + elemVal := reflect.ValueOf(elemIfc).Elem() + if !elemVal.CanInt() { + return false + } + return strings.Contains(valFromConfig, elemRangeSeparator) +} + +// setConfigValueRange is a function that sets the value of the range field +func setConfigValueRange(params Params, valFromConfig string, field reflect.Value) error { + rangeSplit := strings.Split(valFromConfig, params.RangeSeparator) + if len(rangeSplit) != 2 { + return fmt.Errorf("expected format start%vend, got %q", params.RangeSeparator, valFromConfig) + } + start := reflect.New(field.Type().Elem()).Interface() + end := reflect.New(field.Type().Elem()).Interface() + if err := json.Unmarshal([]byte(rangeSplit[0]), &start); err != nil { + return fmt.Errorf("unable to parse start value %q: %w", rangeSplit[0], err) + } + if err := json.Unmarshal([]byte(rangeSplit[1]), &end); err != nil { + return fmt.Errorf("unable to parse end value %q: %w", rangeSplit[1], err) + } + + startVal := reflect.ValueOf(start).Elem() + endVal := reflect.ValueOf(end).Elem() + for i := startVal.Int(); i <= endVal.Int(); i++ { + elemVal := reflect.New(field.Type().Elem()).Elem() + elemVal.SetInt(i) + field.Set(reflect.Append(field, elemVal)) + } + return nil +} + // setConfigValueSlice is a function that sets the value of the slice field -func setConfigValueSlice(valFromConfig string, field reflect.Value) error { +func setConfigValueSlice(params Params, valFromConfig string, field reflect.Value) error { elemIfc := reflect.New(field.Type().Elem()).Interface() split := strings.Split(valFromConfig, elemSeparator) for i, s := range split { s := strings.TrimSpace(s) - if err := setConfigValueHelper(s, reflect.ValueOf(elemIfc).Elem()); err != nil { - return fmt.Errorf("slice element %d: %w", i, err) + if canRange(s, params.RangeSeparator, field) { + if err := setConfigValueRange(params, s, field); err != nil { + return fmt.Errorf("slice element %d: %w", i, err) + } + } else { + if err := setConfigValueHelper(params, s, reflect.ValueOf(elemIfc).Elem()); err != nil { + return fmt.Errorf("slice element %d: %w", i, err) + } + field.Set(reflect.Append(field, reflect.ValueOf(elemIfc).Elem())) } - field.Set(reflect.Append(field, reflect.ValueOf(elemIfc).Elem())) } return nil } // setParamValueHelper is a function that sets the value of the parameter -func setConfigValueHelper(valFromConfig string, field reflect.Value) error { +func setConfigValueHelper(params Params, valFromConfig string, field reflect.Value) error { paramValue := reflect.ValueOf(valFromConfig) if paramValue.Type().AssignableTo(field.Type()) { field.SetString(valFromConfig) @@ -320,13 +371,13 @@ func setConfigValueHelper(valFromConfig string, field reflect.Value) error { return nil } if field.Type() == reflect.TypeOf(url.Values{}) { - return setConfigValueURLParams(valFromConfig, field) + return setConfigValueURLParams(params, valFromConfig, field) } if field.Kind() == reflect.Map { - return setConfigValueMap(valFromConfig, field) + return setConfigValueMap(params, valFromConfig, field) } if field.Kind() == reflect.Slice { - return setConfigValueSlice(valFromConfig, field) + return setConfigValueSlice(params, valFromConfig, field) } if field.CanInterface() { ifc := reflect.New(field.Type()).Interface() @@ -356,8 +407,10 @@ func (sc *ScalerConfig) configParamValue(params Params) (string, bool) { // this is checked when parsing the tags but adding as default case to avoid any potential future problems return "", false } - if param, ok := m[key]; ok && param != "" { - return strings.TrimSpace(param), true + param, ok := m[key] + param = strings.TrimSpace(param) + if ok && param != "" { + return param, true } } return "", params.IsNested() @@ -413,6 +466,13 @@ func paramsFromTag(tag string, field reflect.StructField) (Params, error) { if len(tsplit) > 1 { params.ExclusiveSet = strings.Split(tsplit[1], tagValueSeparator) } + case rangeTag: + if len(tsplit) == 1 { + params.RangeSeparator = "-" + } + if len(tsplit) == 2 { + params.RangeSeparator = strings.TrimSpace(tsplit[1]) + } case "": continue default: diff --git a/pkg/scalers/scalersconfig/typed_config_test.go b/pkg/scalers/scalersconfig/typed_config_test.go index 8da2a5b9954..26b189c8dc5 100644 --- a/pkg/scalers/scalersconfig/typed_config_test.go +++ b/pkg/scalers/scalersconfig/typed_config_test.go @@ -515,3 +515,35 @@ func TestNoParsingOrder(t *testing.T) { Expect(err).To(BeNil()) Expect(tsdm.DefaultVal2).To(Equal("dv")) } + +// TestRange tests the range param +func TestRange(t *testing.T) { + RegisterTestingT(t) + + sc := &ScalerConfig{ + TriggerMetadata: map[string]string{ + "range": "5-10", + "multiRange": "5-10, 15-20", + "dottedRange": "2..7", + "wrongRange": "5..3", + }, + } + + type testStruct struct { + Range []int `keda:"name=range, order=triggerMetadata, range=-"` + MultiRange []int `keda:"name=multiRange, order=triggerMetadata, range"` + DottedRange []int `keda:"name=dottedRange, order=triggerMetadata, range=.."` + WrongRange []int `keda:"name=wrongRange, order=triggerMetadata, range=.."` + } + + ts := testStruct{} + err := sc.TypedConfig(&ts) + Expect(err).To(BeNil()) + Expect(ts.Range).To(HaveLen(6)) + Expect(ts.Range).To(ConsistOf(5, 6, 7, 8, 9, 10)) + Expect(ts.MultiRange).To(HaveLen(12)) + Expect(ts.MultiRange).To(ConsistOf(5, 6, 7, 8, 9, 10, 15, 16, 17, 18, 19, 20)) + Expect(ts.DottedRange).To(HaveLen(6)) + Expect(ts.DottedRange).To(ConsistOf(2, 3, 4, 5, 6, 7)) + Expect(ts.WrongRange).To(HaveLen(0)) +}