diff --git a/CHANGELOG.md b/CHANGELOG.md index 96338167e70..669259befce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -60,6 +60,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) - **General**: Declarative parsing of scaler config ([#5037](https://github.com/kedacore/keda/issues/5037)) +- **Apache Kafka Scaler**: Refactor config parser ([#5797](https://github.com/kedacore/keda/issues/5797)) #### Experimental diff --git a/pkg/scalers/apache_kafka_scaler.go b/pkg/scalers/apache_kafka_scaler.go index 890cc7d61cf..8d3a5be9139 100644 --- a/pkg/scalers/apache_kafka_scaler.go +++ b/pkg/scalers/apache_kafka_scaler.go @@ -23,7 +23,6 @@ import ( "crypto/tls" "errors" "fmt" - "strconv" "strings" "github.com/go-logr/logr" @@ -49,20 +48,20 @@ type apacheKafkaScaler struct { } type apacheKafkaMetadata struct { - bootstrapServers []string - group string - topic []string - partitionLimitation []int32 - lagThreshold int64 - activationLagThreshold int64 - offsetResetPolicy offsetResetPolicy - allowIdleConsumers bool - excludePersistentLag bool + BootstrapServers []string `keda:"name=bootstrapServers, order=triggerMetadata;resolvedEnv"` + Group string `keda:"name=consumerGroup, order=triggerMetadata;resolvedEnv"` + Topic []string `keda:"name=topic, order=triggerMetadata;resolvedEnv, optional"` + PartitionLimitation []int `keda:"name=partitionLimitation, order=triggerMetadata, optional, rangeSeparator=-"` + LagThreshold int64 `keda:"name=lagThreshold, order=triggerMetadata, default=10"` + ActivationLagThreshold int64 `keda:"name=activationLagThreshold, order=triggerMetadata, default=0"` + OffsetResetPolicy offsetResetPolicy `keda:"name=offsetResetPolicy, order=triggerMetadata, enum=earliest;latest, default=latest"` + AllowIdleConsumers bool `keda:"name=allowIdleConsumers, order=triggerMetadata, optional"` + ExcludePersistentLag bool `keda:"name=excludePersistentLag, order=triggerMetadata, optional"` // If an invalid offset is found, whether to scale to 1 (false - the default) so consumption can // occur or scale to 0 (true). See discussion in https://github.com/kedacore/keda/issues/2612 - scaleToZeroOnInvalidOffset bool - limitToPartitionsWithLag bool + ScaleToZeroOnInvalidOffset bool `keda:"name=scaleToZeroOnInvalidOffset, order=triggerMetadata, optional"` + LimitToPartitionsWithLag bool `keda:"name=limitToPartitionsWithLag, order=triggerMetadata, optional"` // SASL saslType kafkaSaslType @@ -84,6 +83,26 @@ type apacheKafkaMetadata struct { triggerIndex int } +func (a *apacheKafkaMetadata) Validate() error { + if a.LagThreshold <= 0 { + return fmt.Errorf("lagThreshold must be a positive number") + } + if a.ActivationLagThreshold < 0 { + return fmt.Errorf("activationLagThreshold must be a positive number") + } + if a.AllowIdleConsumers && a.LimitToPartitionsWithLag { + return fmt.Errorf("allowIdleConsumers and limitToPartitionsWithLag cannot be set simultaneously") + } + if len(a.Topic) == 0 && a.LimitToPartitionsWithLag { + return fmt.Errorf("topic must be specified when using limitToPartitionsWithLag") + } + if len(a.Topic) == 0 && len(a.PartitionLimitation) > 0 { + // no specific topics set, ignoring partitionLimitation setting + a.PartitionLimitation = nil + } + return nil +} + const ( KafkaSASLTypeMskIam = "aws_msk_iam" ) @@ -229,136 +248,15 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache } func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (apacheKafkaMetadata, error) { - meta := apacheKafkaMetadata{} - switch { - case config.TriggerMetadata["bootstrapServersFromEnv"] != "": - meta.bootstrapServers = strings.Split(config.ResolvedEnv[config.TriggerMetadata["bootstrapServersFromEnv"]], ",") - case config.TriggerMetadata["bootstrapServers"] != "": - meta.bootstrapServers = strings.Split(config.TriggerMetadata["bootstrapServers"], ",") - default: - return meta, errors.New("no bootstrapServers given") - } - - switch { - case config.TriggerMetadata["consumerGroupFromEnv"] != "": - meta.group = config.ResolvedEnv[config.TriggerMetadata["consumerGroupFromEnv"]] - case config.TriggerMetadata["consumerGroup"] != "": - meta.group = config.TriggerMetadata["consumerGroup"] - default: - return meta, errors.New("no consumer group given") - } - - switch { - case config.TriggerMetadata["topicFromEnv"] != "": - meta.topic = strings.Split(config.ResolvedEnv[config.TriggerMetadata["topicFromEnv"]], ",") - case config.TriggerMetadata["topic"] != "": - meta.topic = strings.Split(config.TriggerMetadata["topic"], ",") - default: - meta.topic = []string{} - logger.V(1).Info(fmt.Sprintf("consumer group %q has no topics specified, "+ - "will use all topics subscribed by the consumer group for scaling", meta.group)) - } - - meta.partitionLimitation = nil - partitionLimitationMetadata := strings.TrimSpace(config.TriggerMetadata["partitionLimitation"]) - if partitionLimitationMetadata != "" { - if meta.topic == nil || len(meta.topic) == 0 { - logger.V(1).Info("no specific topics set, ignoring partitionLimitation setting") - } else { - pattern := config.TriggerMetadata["partitionLimitation"] - parsed, err := kedautil.ParseInt32List(pattern) - if err != nil { - return meta, fmt.Errorf("error parsing in partitionLimitation '%s': %w", pattern, err) - } - meta.partitionLimitation = parsed - logger.V(0).Info(fmt.Sprintf("partition limit active '%s'", pattern)) - } - } - - meta.offsetResetPolicy = defaultOffsetResetPolicy - - if config.TriggerMetadata["offsetResetPolicy"] != "" { - policy := offsetResetPolicy(config.TriggerMetadata["offsetResetPolicy"]) - if policy != earliest && policy != latest { - return meta, fmt.Errorf("err offsetResetPolicy policy %q given", policy) - } - meta.offsetResetPolicy = policy - } - - meta.lagThreshold = defaultKafkaLagThreshold - - if val, ok := config.TriggerMetadata[lagThresholdMetricName]; ok { - t, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return meta, fmt.Errorf("error parsing %q: %w", lagThresholdMetricName, err) - } - if t <= 0 { - return meta, fmt.Errorf("%q must be positive number", lagThresholdMetricName) - } - meta.lagThreshold = t - } - - meta.activationLagThreshold = defaultKafkaActivationLagThreshold - - if val, ok := config.TriggerMetadata[activationLagThresholdMetricName]; ok { - t, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return meta, fmt.Errorf("error parsing %q: %w", activationLagThresholdMetricName, err) - } - if t < 0 { - return meta, fmt.Errorf("%q must be positive number", activationLagThresholdMetricName) - } - meta.activationLagThreshold = t + meta := apacheKafkaMetadata{triggerIndex: config.TriggerIndex} + if err := config.TypedConfig(&meta); err != nil { + return meta, fmt.Errorf("error parsing kafka metadata: %w", err) } if err := parseApacheKafkaAuthParams(config, &meta); err != nil { return meta, err } - meta.allowIdleConsumers = false - if val, ok := config.TriggerMetadata["allowIdleConsumers"]; ok { - t, err := strconv.ParseBool(val) - if err != nil { - return meta, fmt.Errorf("error parsing allowIdleConsumers: %w", err) - } - meta.allowIdleConsumers = t - } - - meta.excludePersistentLag = false - if val, ok := config.TriggerMetadata["excludePersistentLag"]; ok { - t, err := strconv.ParseBool(val) - if err != nil { - return meta, fmt.Errorf("error parsing excludePersistentLag: %w", err) - } - meta.excludePersistentLag = t - } - - meta.scaleToZeroOnInvalidOffset = false - if val, ok := config.TriggerMetadata["scaleToZeroOnInvalidOffset"]; ok { - t, err := strconv.ParseBool(val) - if err != nil { - return meta, fmt.Errorf("error parsing scaleToZeroOnInvalidOffset: %w", err) - } - meta.scaleToZeroOnInvalidOffset = t - } - - meta.limitToPartitionsWithLag = false - if val, ok := config.TriggerMetadata["limitToPartitionsWithLag"]; ok { - t, err := strconv.ParseBool(val) - if err != nil { - return meta, fmt.Errorf("error parsing limitToPartitionsWithLag: %w", err) - } - meta.limitToPartitionsWithLag = t - - if meta.allowIdleConsumers && meta.limitToPartitionsWithLag { - return meta, fmt.Errorf("allowIdleConsumers and limitToPartitionsWithLag cannot be set simultaneously") - } - if len(meta.topic) == 0 && meta.limitToPartitionsWithLag { - return meta, fmt.Errorf("topic must be specified when using limitToPartitionsWithLag") - } - } - - meta.triggerIndex = config.TriggerIndex return meta, nil } @@ -411,7 +309,7 @@ func getApacheKafkaClient(ctx context.Context, metadata apacheKafkaMetadata, log SASL: saslMechanism, } client := kafka.Client{ - Addr: kafka.TCP(metadata.bootstrapServers...), + Addr: kafka.TCP(metadata.BootstrapServers...), Transport: transport, } if err != nil { @@ -430,12 +328,12 @@ func (s *apacheKafkaScaler) getTopicPartitions(ctx context.Context) (map[string] } s.logger.V(1).Info(fmt.Sprintf("Listed topics %v", metadata.Topics)) - if len(s.metadata.topic) == 0 { + if len(s.metadata.Topic) == 0 { // in case of empty topic name, we will get all topics that the consumer group is subscribed to describeGrpReq := &kafka.DescribeGroupsRequest{ Addr: s.client.Addr, GroupIDs: []string{ - s.metadata.group, + s.metadata.Group, }, } describeGrp, err := s.client.DescribeGroups(ctx, describeGrpReq) @@ -443,17 +341,17 @@ func (s *apacheKafkaScaler) getTopicPartitions(ctx context.Context) (map[string] return nil, fmt.Errorf("error describing group: %w", err) } if len(describeGrp.Groups[0].Members) == 0 { - return nil, fmt.Errorf("no active members in group %s, group-state is %s", s.metadata.group, describeGrp.Groups[0].GroupState) + return nil, fmt.Errorf("no active members in group %s, group-state is %s", s.metadata.Group, describeGrp.Groups[0].GroupState) } - s.logger.V(4).Info(fmt.Sprintf("Described group %s with response %v", s.metadata.group, describeGrp)) + s.logger.V(4).Info(fmt.Sprintf("Described group %s with response %v", s.metadata.Group, describeGrp)) result := make(map[string][]int) for _, topic := range metadata.Topics { partitions := make([]int, 0) for _, partition := range topic.Partitions { // if no partitions limitatitions are specified, all partitions are considered - if (len(s.metadata.partitionLimitation) == 0) || - (len(s.metadata.partitionLimitation) > 0 && kedautil.Contains(s.metadata.partitionLimitation, int32(partition.ID))) { + if (len(s.metadata.PartitionLimitation) == 0) || + (len(s.metadata.PartitionLimitation) > 0 && kedautil.Contains(s.metadata.PartitionLimitation, partition.ID)) { partitions = append(partitions, partition.ID) } } @@ -464,10 +362,10 @@ func (s *apacheKafkaScaler) getTopicPartitions(ctx context.Context) (map[string] result := make(map[string][]int) for _, topic := range metadata.Topics { partitions := make([]int, 0) - if kedautil.Contains(s.metadata.topic, topic.Name) { + if kedautil.Contains(s.metadata.Topic, topic.Name) { for _, partition := range topic.Partitions { - if (len(s.metadata.partitionLimitation) == 0) || - (len(s.metadata.partitionLimitation) > 0 && kedautil.Contains(s.metadata.partitionLimitation, int32(partition.ID))) { + if (len(s.metadata.PartitionLimitation) == 0) || + (len(s.metadata.PartitionLimitation) > 0 && kedautil.Contains(s.metadata.PartitionLimitation, partition.ID)) { partitions = append(partitions, partition.ID) } } @@ -481,7 +379,7 @@ func (s *apacheKafkaScaler) getConsumerOffsets(ctx context.Context, topicPartiti response, err := s.client.OffsetFetch( ctx, &kafka.OffsetFetchRequest{ - GroupID: s.metadata.group, + GroupID: s.metadata.Group, Topics: topicPartitions, }, ) @@ -514,14 +412,14 @@ func (s *apacheKafkaScaler) getLagForPartition(topic string, partitionID int, co } consumerOffset := consumerOffsets[topic][partitionID] - if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest { + if consumerOffset == invalidOffset && s.metadata.OffsetResetPolicy == latest { retVal := int64(1) - if s.metadata.scaleToZeroOnInvalidOffset { + if s.metadata.ScaleToZeroOnInvalidOffset { retVal = 0 } msg := fmt.Sprintf( "invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet. Returning with lag of %d", - topic, s.metadata.group, partitionID, retVal) + topic, s.metadata.Group, partitionID, retVal) s.logger.V(1).Info(msg) return retVal, retVal, nil } @@ -530,15 +428,15 @@ func (s *apacheKafkaScaler) getLagForPartition(topic string, partitionID int, co return 0, 0, fmt.Errorf("error finding partition offset for topic %s", topic) } producerOffset := producerOffsets[topic][partitionID] - if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest { - if s.metadata.scaleToZeroOnInvalidOffset { + if consumerOffset == invalidOffset && s.metadata.OffsetResetPolicy == earliest { + if s.metadata.ScaleToZeroOnInvalidOffset { return 0, 0, nil } return producerOffset, producerOffset, nil } // This code block tries to prevent KEDA Kafka trigger from scaling the scale target based on erroneous events - if s.metadata.excludePersistentLag { + if s.metadata.ExcludePersistentLag { switch previousOffset, found := s.previousOffsets[topic][partitionID]; { case !found: // No record of previous offset, so store current consumer offset @@ -558,8 +456,8 @@ func (s *apacheKafkaScaler) getLagForPartition(topic string, partitionID int, co } } - s.logger.V(4).Info(fmt.Sprintf("Consumer offset for topic %s in group %s and partition %d is %d", topic, s.metadata.group, partitionID, consumerOffset)) - s.logger.V(4).Info(fmt.Sprintf("Producer offset for topic %s in group %s and partition %d is %d", topic, s.metadata.group, partitionID, producerOffset)) + s.logger.V(4).Info(fmt.Sprintf("Consumer offset for topic %s in group %s and partition %d is %d", topic, s.metadata.Group, partitionID, consumerOffset)) + s.logger.V(4).Info(fmt.Sprintf("Producer offset for topic %s in group %s and partition %d is %d", topic, s.metadata.Group, partitionID, producerOffset)) return producerOffset - consumerOffset, producerOffset - consumerOffset, nil } @@ -578,17 +476,17 @@ func (s *apacheKafkaScaler) Close(context.Context) error { func (s *apacheKafkaScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { var metricName string - if s.metadata.topic != nil && len(s.metadata.topic) > 0 { - metricName = fmt.Sprintf("kafka-%s", strings.Join(s.metadata.topic, ",")) + if s.metadata.Topic != nil && len(s.metadata.Topic) > 0 { + metricName = fmt.Sprintf("kafka-%s", strings.Join(s.metadata.Topic, ",")) } else { - metricName = fmt.Sprintf("kafka-%s-topics", s.metadata.group) + metricName = fmt.Sprintf("kafka-%s-topics", s.metadata.Group) } externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(metricName)), }, - Target: GetMetricTarget(s.metricType, s.metadata.lagThreshold), + Target: GetMetricTarget(s.metricType, s.metadata.LagThreshold), } metricSpec := v2.MetricSpec{External: externalMetric, Type: kafkaMetricType} return []v2.MetricSpec{metricSpec} @@ -639,7 +537,7 @@ func (s *apacheKafkaScaler) GetMetricsAndActivity(ctx context.Context, metricNam } metric := GenerateMetricInMili(metricName, float64(totalLag)) - return []external_metrics.ExternalMetricValue{metric}, totalLagWithPersistent > s.metadata.activationLagThreshold, nil + return []external_metrics.ExternalMetricValue{metric}, totalLagWithPersistent > s.metadata.ActivationLagThreshold, nil } // getTotalLag returns totalLag, totalLagWithPersistent, error @@ -678,19 +576,19 @@ func (s *apacheKafkaScaler) getTotalLag(ctx context.Context) (int64, int64, erro } totalTopicPartitions += (int64)(len(partitionsOffsets)) } - s.logger.V(1).Info(fmt.Sprintf("Kafka scaler: Providing metrics based on totalLag %v, topicPartitions %v, threshold %v", totalLag, topicPartitions, s.metadata.lagThreshold)) + s.logger.V(1).Info(fmt.Sprintf("Kafka scaler: Providing metrics based on totalLag %v, topicPartitions %v, threshold %v", totalLag, topicPartitions, s.metadata.LagThreshold)) s.logger.V(1).Info(fmt.Sprintf("Kafka scaler: Consumer offsets %v, producer offsets %v", consumerOffsets, producerOffsets)) - if !s.metadata.allowIdleConsumers || s.metadata.limitToPartitionsWithLag { + if !s.metadata.AllowIdleConsumers || s.metadata.LimitToPartitionsWithLag { // don't scale out beyond the number of topicPartitions or partitionsWithLag depending on settings upperBound := totalTopicPartitions - if s.metadata.limitToPartitionsWithLag { + if s.metadata.LimitToPartitionsWithLag { upperBound = partitionsWithLag } - if (totalLag / s.metadata.lagThreshold) > upperBound { - totalLag = upperBound * s.metadata.lagThreshold + if (totalLag / s.metadata.LagThreshold) > upperBound { + totalLag = upperBound * s.metadata.LagThreshold } } return totalLag, totalLagWithPersistent, nil diff --git a/pkg/scalers/apache_kafka_scaler_test.go b/pkg/scalers/apache_kafka_scaler_test.go index 11c2864cf34..215d67cd331 100644 --- a/pkg/scalers/apache_kafka_scaler_test.go +++ b/pkg/scalers/apache_kafka_scaler_test.go @@ -19,7 +19,7 @@ type parseApacheKafkaMetadataTestData struct { brokers []string group string topic []string - partitionLimitation []int32 + partitionLimitation []int offsetResetPolicy offsetResetPolicy allowIdleConsumers bool excludePersistentLag bool @@ -68,13 +68,13 @@ var parseApacheKafkaMetadataTestDataset = []parseApacheKafkaMetadataTestData{ // failure, no consumer group {map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", nil, nil, "latest", false, false, false}, // success, no topics - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", nil, nil, offsetResetPolicy("latest"), false, false, false}, // success, ignore partitionLimitation if no topics - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": "1,2,3,4,5,6"}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": "1,2,3,4,5,6"}, false, 1, []string{"foobar:9092"}, "my-group", nil, nil, offsetResetPolicy("latest"), false, false, false}, // success, no limitation with whitespaced limitation value - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": " "}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": " "}, false, 1, []string{"foobar:9092"}, "my-group", nil, nil, offsetResetPolicy("latest"), false, false, false}, // success, no limitation - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": ""}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": ""}, false, 1, []string{"foobar:9092"}, "my-group", nil, nil, offsetResetPolicy("latest"), false, false, false}, // failure, lagThreshold is negative value {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false}, // failure, lagThreshold is 0 @@ -86,11 +86,11 @@ var parseApacheKafkaMetadataTestDataset = []parseApacheKafkaMetadataTestData{ // success {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false}, // success, partitionLimitation as list - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1,2,3,4"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1,2,3,4"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false, false}, // success, partitionLimitation as range - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1-4"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1-4"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false, false}, // success, partitionLimitation mixed list + ranges - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1-4,8,10-12"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int32{1, 2, 3, 4, 8, 10, 11, 12}, offsetResetPolicy("latest"), false, false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1-4,8,10-12"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int{1, 2, 3, 4, 8, 10, 11, 12}, offsetResetPolicy("latest"), false, false, false}, // failure, partitionLimitation wrong data type {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "a,b,c,d"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false}, // success, more brokers @@ -120,7 +120,7 @@ var parseApacheKafkaMetadataTestDataset = []parseApacheKafkaMetadataTestData{ // success, allowIdleConsumers can be set when limitToPartitionsWithLag is false {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "allowIdleConsumers": "true", "limitToPartitionsWithLag": "false"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), true, false, false}, // failure, topic must be specified when limitToPartitionsWithLag is true - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "limitToPartitionsWithLag": "true"}, true, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, true}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "limitToPartitionsWithLag": "true"}, true, 1, []string{"foobar:9092"}, "my-group", nil, nil, offsetResetPolicy("latest"), false, false, true}, } var parseApacheKafkaAuthParamsTestDataset = []parseApacheKafkaAuthParamsTestData{ @@ -258,32 +258,32 @@ func getBrokerApacheKafkaTestBase(t *testing.T, meta apacheKafkaMetadata, testDa if testData.isError && err == nil { t.Error("Expected error but got success") } - if len(meta.bootstrapServers) != testData.numBrokers { - t.Errorf("Expected %d bootstrap servers but got %d\n", testData.numBrokers, len(meta.bootstrapServers)) + if len(meta.BootstrapServers) != testData.numBrokers { + t.Errorf("Expected %d bootstrap servers but got %d\n", testData.numBrokers, len(meta.BootstrapServers)) } - if !reflect.DeepEqual(testData.brokers, meta.bootstrapServers) { - t.Errorf("Expected %#v but got %#v\n", testData.brokers, meta.bootstrapServers) + if !reflect.DeepEqual(testData.brokers, meta.BootstrapServers) { + t.Errorf("Expected %#v but got %#v\n", testData.brokers, meta.BootstrapServers) } - if meta.group != testData.group { - t.Errorf("Expected group %s but got %s\n", testData.group, meta.group) + if meta.Group != testData.group { + t.Errorf("Expected group %s but got %s\n", testData.group, meta.Group) } - if !reflect.DeepEqual(testData.topic, meta.topic) { - t.Errorf("Expected topics %#v but got %#v\n", testData.topic, meta.topic) + if !reflect.DeepEqual(testData.topic, meta.Topic) { + t.Errorf("Expected topics %#v but got %#v\n", testData.topic, meta.Topic) } - if !reflect.DeepEqual(testData.partitionLimitation, meta.partitionLimitation) { - t.Errorf("Expected %#v but got %#v\n", testData.partitionLimitation, meta.partitionLimitation) + if !reflect.DeepEqual(testData.partitionLimitation, meta.PartitionLimitation) { + t.Errorf("Expected %#v but got %#v\n", testData.partitionLimitation, meta.PartitionLimitation) } - if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy { - t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy) + if err == nil && meta.OffsetResetPolicy != testData.offsetResetPolicy { + t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.OffsetResetPolicy) } - if err == nil && meta.allowIdleConsumers != testData.allowIdleConsumers { - t.Errorf("Expected allowIdleConsumers %t but got %t\n", testData.allowIdleConsumers, meta.allowIdleConsumers) + if err == nil && meta.AllowIdleConsumers != testData.allowIdleConsumers { + t.Errorf("Expected allowIdleConsumers %t but got %t\n", testData.allowIdleConsumers, meta.AllowIdleConsumers) } - if err == nil && meta.excludePersistentLag != testData.excludePersistentLag { - t.Errorf("Expected excludePersistentLag %t but got %t\n", testData.excludePersistentLag, meta.excludePersistentLag) + if err == nil && meta.ExcludePersistentLag != testData.excludePersistentLag { + t.Errorf("Expected excludePersistentLag %t but got %t\n", testData.excludePersistentLag, meta.ExcludePersistentLag) } - if err == nil && meta.limitToPartitionsWithLag != testData.limitToPartitionsWithLag { - t.Errorf("Expected limitToPartitionsWithLag %t but got %t\n", testData.limitToPartitionsWithLag, meta.limitToPartitionsWithLag) + if err == nil && meta.LimitToPartitionsWithLag != testData.limitToPartitionsWithLag { + t.Errorf("Expected limitToPartitionsWithLag %t but got %t\n", testData.limitToPartitionsWithLag, meta.LimitToPartitionsWithLag) } expectedLagThreshold, er := parseExpectedLagThreshold(testData.metadata) @@ -291,8 +291,8 @@ func getBrokerApacheKafkaTestBase(t *testing.T, meta apacheKafkaMetadata, testDa t.Errorf("Unable to convert test data lagThreshold %s to string", testData.metadata["lagThreshold"]) } - if meta.lagThreshold != expectedLagThreshold && meta.lagThreshold != defaultKafkaLagThreshold { - t.Errorf("Expected lagThreshold to be either %v or %v got %v ", meta.lagThreshold, defaultKafkaLagThreshold, expectedLagThreshold) + if meta.LagThreshold != expectedLagThreshold && meta.LagThreshold != defaultKafkaLagThreshold { + t.Errorf("Expected lagThreshold to be either %v or %v got %v ", meta.LagThreshold, defaultKafkaLagThreshold, expectedLagThreshold) } } func TestApacheKafkaAuthParams(t *testing.T) {