Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka scaler: concurrent offset fetches #2405

Merged
merged 11 commits into from
Jan 3, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
- Graphite Scaler: use the latest datapoint returned, not the earliest ([#2365](https://github.com/kedacore/keda/pull/2365))
- Kubernetes Workload Scaler: ignore terminated pods ([#2384](https://github.com/kedacore/keda/pull/2384))
- `keda-operator` Cluster Role: add `list` and `watch` access to service accounts ([#2406](https://github.com/kedacore/keda/pull/2406))|([#2410](https://github.com/kedacore/keda/pull/2410))
- Kafka Scaler: concurrently query brokers for consumer and producer offsets ([#2405](https://github.com/kedacore/keda/pull/2405))

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))

Expand Down
88 changes: 66 additions & 22 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"

"github.com/Shopify/sarama"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
Expand Down Expand Up @@ -214,18 +215,13 @@ func (s *kafkaScaler) IsActive(ctx context.Context) (bool, error) {
return false, err
}

offsets, err := s.getOffsets(partitions)
if err != nil {
return false, err
}

topicOffsets, err := s.getTopicOffsets(partitions)
consumerOffsets, producerOffsets, err := s.getConsumerAndProducerOffsets(partitions)
if err != nil {
return false, err
}

for _, partition := range partitions {
lag, err := s.getLagForPartition(partition, offsets, topicOffsets)
lag, err := s.getLagForPartition(partition, consumerOffsets, producerOffsets)
if err != nil && lag == invalidOffset {
return true, nil
}
Expand Down Expand Up @@ -307,7 +303,7 @@ func (s *kafkaScaler) getPartitions() ([]int32, error) {
return partitions, nil
}

func (s *kafkaScaler) getOffsets(partitions []int32) (*sarama.OffsetFetchResponse, error) {
func (s *kafkaScaler) getConsumerOffsets(partitions []int32) (*sarama.OffsetFetchResponse, error) {
offsets, err := s.admin.ListConsumerGroupOffsets(s.metadata.group, map[string][]int32{
s.metadata.topic: partitions,
})
Expand Down Expand Up @@ -364,26 +360,57 @@ func (s *kafkaScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricS
return []v2beta2.MetricSpec{metricSpec}
}

type consumerOffsetResult struct {
consumerOffsets *sarama.OffsetFetchResponse
err error
}

type producerOffsetResult struct {
producerOffsets map[int32]int64
err error
}

func (s *kafkaScaler) getConsumerAndProducerOffsets(partitions []int32) (*sarama.OffsetFetchResponse, map[int32]int64, error) {
consumerChan := make(chan consumerOffsetResult, 1)
VerstraeteBert marked this conversation as resolved.
Show resolved Hide resolved
go func() {
consumerOffsets, err := s.getConsumerOffsets(partitions)
consumerChan <- consumerOffsetResult{consumerOffsets, err}
}()

producerChan := make(chan producerOffsetResult, 1)
go func() {
producerOffsets, err := s.getProducerOffsets(partitions)
producerChan <- producerOffsetResult{producerOffsets, err}
}()

consumerRes := <-consumerChan
if consumerRes.err != nil {
return nil, nil, consumerRes.err
}

producerRes := <-producerChan
if producerRes.err != nil {
return nil, nil, producerRes.err
}

return consumerRes.consumerOffsets, producerRes.producerOffsets, nil
}

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
partitions, err := s.getPartitions()
if err != nil {
return []external_metrics.ExternalMetricValue{}, err
}

offsets, err := s.getOffsets(partitions)
if err != nil {
return []external_metrics.ExternalMetricValue{}, err
}

topicOffsets, err := s.getTopicOffsets(partitions)
consumerOffsets, producerOffsets, err := s.getConsumerAndProducerOffsets(partitions)
if err != nil {
return []external_metrics.ExternalMetricValue{}, err
}

totalLag := int64(0)
for _, partition := range partitions {
lag, _ := s.getLagForPartition(partition, offsets, topicOffsets)
lag, _ := s.getLagForPartition(partition, consumerOffsets, producerOffsets)

totalLag += lag
}
Expand All @@ -406,7 +433,12 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricS
return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (s *kafkaScaler) getTopicOffsets(partitions []int32) (map[int32]int64, error) {
type brokerOffsetResult struct {
offsetResp *sarama.OffsetResponse
err error
}

func (s *kafkaScaler) getProducerOffsets(partitions []int32) (map[int32]int64, error) {
version := int16(0)
if s.client.Config().Version.IsAtLeast(sarama.V0_10_1_0) {
version = 1
Expand All @@ -430,17 +462,29 @@ func (s *kafkaScaler) getTopicOffsets(partitions []int32) (map[int32]int64, erro
request.AddBlock(s.metadata.topic, partitionID, sarama.OffsetNewest, 1)
}

offsets := make(map[int32]int64)

// Step 2: send requests, one per broker, and collect offsets
resultCh := make(chan brokerOffsetResult, len(requests))
var wg sync.WaitGroup
wg.Add(len(requests))
for broker, request := range requests {
response, err := broker.GetAvailableOffsets(request)
go func(brCopy *sarama.Broker, reqCopy *sarama.OffsetRequest) {
VerstraeteBert marked this conversation as resolved.
Show resolved Hide resolved
defer wg.Done()
response, err := brCopy.GetAvailableOffsets(reqCopy)
resultCh <- brokerOffsetResult{response, err}
}(broker, request)
}

if err != nil {
return nil, err
wg.Wait()
close(resultCh)

offsets := make(map[int32]int64)

for brokerOffsetRes := range resultCh {
if brokerOffsetRes.err != nil {
return nil, brokerOffsetRes.err
}

for _, blocks := range response.Blocks {
for _, blocks := range brokerOffsetRes.offsetResp.Blocks {
for partitionID, block := range blocks {
if block.Err != sarama.ErrNoError {
return nil, block.Err
Expand Down
2 changes: 1 addition & 1 deletion tests/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## Prerequisits
## Prerequisites

- [node](https://nodejs.org/en/)
- `kubectl` logged into a Kubernetes cluster.
Expand Down
10 changes: 5 additions & 5 deletions tests/scalers/kafka.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const defaultKafkaClient = 'kafka-client'
const strimziOperatorVersion = '0.18.0'
const commandToCheckReplicas = `kubectl get deployments/kafka-consumer --namespace ${defaultNamespace} -o jsonpath="{.spec.replicas}"`

const strimziOperatroYamlFile = tmp.fileSync()
const strimziOperatorYamlFile = tmp.fileSync()
const kafkaClusterYamlFile = tmp.fileSync()
const kafkaTopicYamlFile = tmp.fileSync()
const kafkaClientYamlFile = tmp.fileSync()
Expand All @@ -25,10 +25,10 @@ test.before('Set up, create necessary resources.', t => {
sh.exec(`kubectl create namespace ${defaultNamespace}`)

const strimziOperatorYaml = sh.exec(`curl -L https://github.com/strimzi/strimzi-kafka-operator/releases/download/${strimziOperatorVersion}/strimzi-cluster-operator-${strimziOperatorVersion}.yaml`).stdout
fs.writeFileSync(strimziOperatroYamlFile.name, strimziOperatorYaml.replace(/myproject/g, `${defaultNamespace}`))
fs.writeFileSync(strimziOperatorYamlFile.name, strimziOperatorYaml.replace(/myproject/g, `${defaultNamespace}`))
t.is(
0,
sh.exec(`kubectl apply -f ${strimziOperatroYamlFile.name} --namespace ${defaultNamespace}`).code,
sh.exec(`kubectl apply -f ${strimziOperatorYamlFile.name} --namespace ${defaultNamespace}`).code,
'Deploying Strimzi operator should work.'
)

Expand Down Expand Up @@ -195,7 +195,7 @@ test.after.always('Clean up, delete created resources.', t => {
`${kafkaClientYamlFile.name}`,
`${kafkaTopicYamlFile.name}`,
`${kafkaClusterYamlFile.name}`,
`${strimziOperatroYamlFile}`
`${strimziOperatorYamlFile}`
]

for (const resource of resources) {
Expand All @@ -212,7 +212,7 @@ metadata:
spec:
kafka:
version: 2.5.0
replicas: 1
replicas: 3
listeners:
plain: {}
tls: {}
Expand Down