Skip to content

Commit

Permalink
One more open broker connection
Browse files Browse the repository at this point in the history
  • Loading branch information
alok87 committed Feb 19, 2021
1 parent 0e99d77 commit cf007c2
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 0 deletions.
2 changes: 2 additions & 0 deletions controllers/redshiftsink_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,8 @@ func (r *RedshiftSinkReconciler) reconcile(
}
}

klog.V(2).Infof("rsk/%v reconciling all sinkGroups", rsk.Name)

reloadDupe = sgBuilder.
setRedshiftSink(rsk).setClient(r.Client).setScheme(r.Scheme).
setType(ReloadDupeSinkGroup).
Expand Down
5 changes: 5 additions & 0 deletions pkg/kafka/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ func (t *kafkaWatch) consumerGroupLag(
}
offsetFetchRequest.AddPartition(topic, partition)

err = broker.Open(t.client.Config())
if err != nil && err != sarama.ErrAlreadyConnected {
return defaultLag, fmt.Errorf("Error opening broker connection again, err: %v", err)
}

offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest)
if err != nil {
return defaultLag, fmt.Errorf(
Expand Down

0 comments on commit cf007c2

Please sign in to comment.