diff --git a/controllers/redshiftsink_controller.go b/controllers/redshiftsink_controller.go index 642c34c25..fd2735cbe 100644 --- a/controllers/redshiftsink_controller.go +++ b/controllers/redshiftsink_controller.go @@ -435,6 +435,8 @@ func (r *RedshiftSinkReconciler) reconcile( } } + klog.V(2).Infof("rsk/%v reconciling all sinkGroups", rsk.Name) + reloadDupe = sgBuilder. setRedshiftSink(rsk).setClient(r.Client).setScheme(r.Scheme). setType(ReloadDupeSinkGroup). diff --git a/pkg/kafka/watcher.go b/pkg/kafka/watcher.go index d3971344b..5bc5462c6 100644 --- a/pkg/kafka/watcher.go +++ b/pkg/kafka/watcher.go @@ -162,6 +162,11 @@ func (t *kafkaWatch) consumerGroupLag( } offsetFetchRequest.AddPartition(topic, partition) + err = broker.Open(t.client.Config()) + if err != nil && err != sarama.ErrAlreadyConnected { + return defaultLag, fmt.Errorf("Error opening broker connection again, err: %v", err) + } + offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest) if err != nil { return defaultLag, fmt.Errorf(