Skip to content

Commit

Permalink
concurrent broker topic offset fetches
Browse files Browse the repository at this point in the history
  • Loading branch information
VerstraeteBert committed Dec 17, 2021
1 parent 4a1e358 commit e607313
Showing 1 changed file with 30 additions and 5 deletions.
35 changes: 30 additions & 5 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"

"github.com/Shopify/sarama"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
Expand Down Expand Up @@ -406,6 +407,11 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricS
return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

type brokerOffsetResult struct {
offsetResp *sarama.OffsetResponse
err error
}

func (s *kafkaScaler) getTopicOffsets(partitions []int32) (map[int32]int64, error) {
version := int16(0)
if s.client.Config().Version.IsAtLeast(sarama.V0_10_1_0) {
Expand All @@ -430,26 +436,45 @@ func (s *kafkaScaler) getTopicOffsets(partitions []int32) (map[int32]int64, erro
request.AddBlock(s.metadata.topic, partitionID, sarama.OffsetNewest, 1)
}

offsets := make(map[int32]int64)

// Step 2: send requests, one per broker, and collect offsets
resultCh := make(chan brokerOffsetResult)
var wg sync.WaitGroup
wg.Add(len(requests))
for broker, request := range requests {
response, err := broker.GetAvailableOffsets(request)
go func() {
response, err := broker.GetAvailableOffsets(request)
resultCh <- brokerOffsetResult{response, err}
wg.Done()
}()
}

if err != nil {
return nil, err
go func() {
wg.Wait()
close(resultCh)
}()

var mtx = &sync.RWMutex{}
offsets := make(map[int32]int64)
for brokerOffsetRes := range resultCh {
if brokerOffsetRes.err != nil {
close(resultCh)
return nil, brokerOffsetRes.err
}

for _, blocks := range response.Blocks {
for _, blocks := range brokerOffsetRes.offsetResp.Blocks {
for partitionID, block := range blocks {
if block.Err != sarama.ErrNoError {
return nil, block.Err
}

mtx.Lock()
offsets[partitionID] = block.Offset
mtx.Unlock()
}
}
}

return offsets, nil
}

0 comments on commit e607313

Please sign in to comment.