diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 0007da4fc9d..b23f7a569be 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -6,6 +6,7 @@ import ( "fmt" "strconv" "strings" + "sync" "github.com/Shopify/sarama" v2beta2 "k8s.io/api/autoscaling/v2beta2" @@ -214,18 +215,13 @@ func (s *kafkaScaler) IsActive(ctx context.Context) (bool, error) { return false, err } - offsets, err := s.getOffsets(partitions) - if err != nil { - return false, err - } - - topicOffsets, err := s.getTopicOffsets(partitions) + consumerOffsets, producerOffsets, err := s.getConsumerAndProducerOffsets(partitions) if err != nil { return false, err } for _, partition := range partitions { - lag, err := s.getLagForPartition(partition, offsets, topicOffsets) + lag, err := s.getLagForPartition(partition, consumerOffsets, producerOffsets) if err != nil && lag == invalidOffset { return true, nil } @@ -307,7 +303,7 @@ func (s *kafkaScaler) getPartitions() ([]int32, error) { return partitions, nil } -func (s *kafkaScaler) getOffsets(partitions []int32) (*sarama.OffsetFetchResponse, error) { +func (s *kafkaScaler) getConsumerOffsets(partitions []int32) (*sarama.OffsetFetchResponse, error) { offsets, err := s.admin.ListConsumerGroupOffsets(s.metadata.group, map[string][]int32{ s.metadata.topic: partitions, }) @@ -364,6 +360,42 @@ func (s *kafkaScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricS return []v2beta2.MetricSpec{metricSpec} } +type consumerOffsetResult struct { + consumerOffsets *sarama.OffsetFetchResponse + err error +} + +type producerOffsetResult struct { + producerOffsets map[int32]int64 + err error +} + +func (s *kafkaScaler) getConsumerAndProducerOffsets(partitions []int32) (*sarama.OffsetFetchResponse, map[int32]int64, error) { + consumerChan := make(chan consumerOffsetResult, 1) + go func() { + consumerOffsets, err := s.getConsumerOffsets(partitions) + consumerChan <- consumerOffsetResult{consumerOffsets, err} + }() + + producerChan := make(chan producerOffsetResult, 1) + go func() { + producerOffsets, err := s.getProducerOffsets(partitions) + producerChan <- producerOffsetResult{producerOffsets, err} + }() + + consumerRes := <-consumerChan + if consumerRes.err != nil { + return nil, nil, consumerRes.err + } + + producerRes := <-producerChan + if producerRes.err != nil { + return nil, nil, producerRes.err + } + + return consumerRes.consumerOffsets, producerRes.producerOffsets, nil +} + // GetMetrics returns value for a supported metric and an error if there is a problem getting the metric func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { partitions, err := s.getPartitions() @@ -371,19 +403,14 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricS return []external_metrics.ExternalMetricValue{}, err } - offsets, err := s.getOffsets(partitions) - if err != nil { - return []external_metrics.ExternalMetricValue{}, err - } - - topicOffsets, err := s.getTopicOffsets(partitions) + consumerOffsets, producerOffsets, err := s.getConsumerAndProducerOffsets(partitions) if err != nil { return []external_metrics.ExternalMetricValue{}, err } totalLag := int64(0) for _, partition := range partitions { - lag, _ := s.getLagForPartition(partition, offsets, topicOffsets) + lag, _ := s.getLagForPartition(partition, consumerOffsets, producerOffsets) totalLag += lag } @@ -406,7 +433,12 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricS return append([]external_metrics.ExternalMetricValue{}, metric), nil } -func (s *kafkaScaler) getTopicOffsets(partitions []int32) (map[int32]int64, error) { +type brokerOffsetResult struct { + offsetResp *sarama.OffsetResponse + err error +} + +func (s *kafkaScaler) getProducerOffsets(partitions []int32) (map[int32]int64, error) { version := int16(0) if s.client.Config().Version.IsAtLeast(sarama.V0_10_1_0) { version = 1 @@ -430,17 +462,29 @@ func (s *kafkaScaler) getTopicOffsets(partitions []int32) (map[int32]int64, erro request.AddBlock(s.metadata.topic, partitionID, sarama.OffsetNewest, 1) } - offsets := make(map[int32]int64) - // Step 2: send requests, one per broker, and collect offsets + resultCh := make(chan brokerOffsetResult, len(requests)) + var wg sync.WaitGroup + wg.Add(len(requests)) for broker, request := range requests { - response, err := broker.GetAvailableOffsets(request) + go func(brCopy *sarama.Broker, reqCopy *sarama.OffsetRequest) { + defer wg.Done() + response, err := brCopy.GetAvailableOffsets(reqCopy) + resultCh <- brokerOffsetResult{response, err} + }(broker, request) + } - if err != nil { - return nil, err + wg.Wait() + close(resultCh) + + offsets := make(map[int32]int64) + + for brokerOffsetRes := range resultCh { + if brokerOffsetRes.err != nil { + return nil, brokerOffsetRes.err } - for _, blocks := range response.Blocks { + for _, blocks := range brokerOffsetRes.offsetResp.Blocks { for partitionID, block := range blocks { if block.Err != sarama.ErrNoError { return nil, block.Err diff --git a/tests/README.md b/tests/README.md index f976dbb6c13..963fc4026ea 100644 --- a/tests/README.md +++ b/tests/README.md @@ -1,4 +1,4 @@ -## Prerequisits +## Prerequisites - [node](https://nodejs.org/en/) - `kubectl` logged into a Kubernetes cluster. diff --git a/tests/scalers/kafka.test.ts b/tests/scalers/kafka.test.ts index 61413775ab9..2e5932990bd 100644 --- a/tests/scalers/kafka.test.ts +++ b/tests/scalers/kafka.test.ts @@ -11,7 +11,7 @@ const defaultKafkaClient = 'kafka-client' const strimziOperatorVersion = '0.18.0' const commandToCheckReplicas = `kubectl get deployments/kafka-consumer --namespace ${defaultNamespace} -o jsonpath="{.spec.replicas}"` -const strimziOperatroYamlFile = tmp.fileSync() +const strimziOperatorYamlFile = tmp.fileSync() const kafkaClusterYamlFile = tmp.fileSync() const kafkaTopicYamlFile = tmp.fileSync() const kafkaClientYamlFile = tmp.fileSync() @@ -25,10 +25,10 @@ test.before('Set up, create necessary resources.', t => { sh.exec(`kubectl create namespace ${defaultNamespace}`) const strimziOperatorYaml = sh.exec(`curl -L https://github.com/strimzi/strimzi-kafka-operator/releases/download/${strimziOperatorVersion}/strimzi-cluster-operator-${strimziOperatorVersion}.yaml`).stdout - fs.writeFileSync(strimziOperatroYamlFile.name, strimziOperatorYaml.replace(/myproject/g, `${defaultNamespace}`)) + fs.writeFileSync(strimziOperatorYamlFile.name, strimziOperatorYaml.replace(/myproject/g, `${defaultNamespace}`)) t.is( 0, - sh.exec(`kubectl apply -f ${strimziOperatroYamlFile.name} --namespace ${defaultNamespace}`).code, + sh.exec(`kubectl apply -f ${strimziOperatorYamlFile.name} --namespace ${defaultNamespace}`).code, 'Deploying Strimzi operator should work.' ) @@ -195,7 +195,7 @@ test.after.always('Clean up, delete created resources.', t => { `${kafkaClientYamlFile.name}`, `${kafkaTopicYamlFile.name}`, `${kafkaClusterYamlFile.name}`, - `${strimziOperatroYamlFile}` + `${strimziOperatorYamlFile}` ] for (const resource of resources) { @@ -212,7 +212,7 @@ metadata: spec: kafka: version: 2.5.0 - replicas: 1 + replicas: 3 listeners: plain: {} tls: {}