diff --git a/component/kafka/group/component.go b/component/kafka/group/component.go index cce90fad36..9dae6b0e9a 100644 --- a/component/kafka/group/component.go +++ b/component/kafka/group/component.go @@ -187,25 +187,21 @@ func (c *Component) processing(ctx context.Context) error { c.batchTimeout, c.commitSync) client, err := sarama.NewConsumerGroup(c.brokers, c.group, c.saramaConfig) + componentError = err if err != nil { - componentError = err log.Errorf("error creating consumer group client for kafka component: %v", err) } if client != nil { - // `Consume` should be called inside an infinite loop, when a - // server-side rebalance happens, the consumer session will need to be + // `Consume` should be called inside a loop, when a + // server-side re-balance happens, the consumer session will need to be // recreated to get the new claims - if err := client.Consume(ctx, c.topics, handler); err != nil { + err := client.Consume(ctx, c.topics, handler) + componentError = err + if err != nil { log.Errorf("error from kafka consumer: %v", err) } - // check if context was cancelled or deadline exceeded, signaling that the consumer should stop - if ctx.Err() != nil { - log.Infof("kafka component terminating: context cancelled or deadline exceeded") - break - } - err = client.Close() if err != nil { log.Errorf("error closing kafka consumer: %v", err) @@ -213,11 +209,19 @@ func (c *Component) processing(ctx context.Context) error { } consumerErrorsInc(c.name) + + // check if context was cancelled or deadline exceeded, signaling that the consumer should stop + if ctx.Err() != nil { + log.Infof("kafka component terminating: context cancelled or deadline exceeded") + break + } + if c.retries > 0 { if handler.processedMessages { i = 0 + componentError = nil } - log.Errorf("failed run, retry %d/%d with %v wait: %v", i, c.retries, c.retryWait, err) + log.Errorf("failed run, retry %d/%d with %v wait: %v", i, c.retries, c.retryWait, componentError) time.Sleep(c.retryWait) } @@ -225,7 +229,7 @@ func (c *Component) processing(ctx context.Context) error { // then the handler errored while processing a message. This faulty message is then the reason // behind the component failure. if i == retries && componentError == nil { - componentError = handler.err + componentError = fmt.Errorf("message processing failure exhausted %d retries: %w", i, handler.err) } } return componentError