Skip to content

Commit

Permalink
Always close broker connections and check using Connected
Browse files Browse the repository at this point in the history
  • Loading branch information
alok87 committed Feb 19, 2021
1 parent 01be655 commit 0e99d77
Showing 1 changed file with 10 additions and 0 deletions.
10 changes: 10 additions & 0 deletions pkg/kafka/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,21 @@ func (t *kafkaWatch) ConsumerGroupLag(
}

for _, broker := range t.client.Brokers() {
defer broker.Close()

err = broker.Open(t.client.Config())
if err != nil && err != sarama.ErrAlreadyConnected {
return lag, fmt.Errorf("Error opening broker connection, err: %v", err)
}

connected, err := broker.Connected()
if err != nil {
return lag, fmt.Errorf("Error checking broker connection, err:%v", err)
}
if !connected {
return lag, fmt.Errorf("Could not connect broker: %+v", broker)
}

lag, err = t.consumerGroupLag(id, topic, 0, broker)
if err != nil {
return lag, fmt.Errorf("Error calculating consumerGroupLag, err: %v", err)
Expand Down

0 comments on commit 0e99d77

Please sign in to comment.