Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer poll loop does not commit offsets in case of application shutdown #557

Closed
rapasoft opened this issue Jun 17, 2022 · 1 comment
Closed
Assignees
Labels
closed: notabug The issue is not a bug

Comments

@rapasoft
Copy link

Expected Behavior

I am creating a simple @KafkaListener which is using default enable.auto.commit=true with 5 second interval (all defaults). When Micronaut application shuts down, the consumer ends abruptly. What I would expect, that the consumer commits already processed offsets synchronously, as it is declared in method io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor#createConsumerThreadPollLoop in the catch block:

...
                 } catch (WakeupException e) {
                    try {
                        if (!failed && consumerState.offsetStrategy != OffsetStrategy.DISABLED) {
                            kafkaConsumer.commitSync();
                        }
                    } catch (Throwable ex) {
                        LOG.warn("Error committing Kafka offsets on shutdown: {}", ex.getMessage(), ex);
                    }
                    throw e;
                }
...

However, this code does not seem to executed. I tried increasing pollTimeout, but this is probably connected to a fact that consumer thread is just "destroyed" and there's no graceful shutdown. I would also expect, that consumer leaves the consumer group (no logs about that).

Actual Behaviour

The consumer "dies" and offsets that were processed are not committed to broker, thus they are re-read when (another) consumer takes the partition.

Steps To Reproduce

  1. Create simple project with micronaut-kafka
  2. Add simple Listener/Client
  3. Run the application and stop it before auto commit interval is exceeded

Environment Information

  • Mac OS 12.4
  • JDK 17

Example Application

No response

Version

3.4.0

@guillermocalvo
Copy link
Contributor

@rapasoft

I spent some time trying to reproduce the problem you described; but every time the application context is closed, Kafka consumers are terminated in an orderly manner.

Log messages confirm that consumers do commit offsets:

Level Logger Message
DEBUG o.a.k.c.c.i.ConsumerCoordinator Sending synchronous auto-commit of offsets ...
TRACE o.a.k.c.c.i.ConsumerCoordinator Sending OffsetCommit request with ... to coordinator ...
DEBUG o.apache.kafka.clients.NetworkClient Sending OFFSET_COMMIT request with header ... to node ...
DEBUG o.apache.kafka.clients.NetworkClient Received OFFSET_COMMIT response from node ... for request with header ...
DEBUG o.apache.kafka.clients.NetworkClient Received FETCH response from node ... for request with header ...
DEBUG o.a.k.c.c.i.ConsumerCoordinator Committed offset ... for partition ...

They leave the consumer group:

Level Logger Message
DEBUG o.a.k.c.c.i.ConsumerCoordinator Executing onLeavePrepare with generation ...
INFO o.a.k.c.c.i.ConsumerCoordinator Revoke previously assigned partitions ...
INFO o.a.k.c.c.i.ConsumerCoordinator Member ... sending LeaveGroup request to coordinator ... due to the consumer is being closed
DEBUG o.apache.kafka.clients.NetworkClient Sending LEAVE_GROUP request with header ...
INFO o.a.k.c.c.i.ConsumerCoordinator Resetting generation and member id due to: consumer pro-actively leaving the group
INFO o.a.k.c.c.i.ConsumerCoordinator Request joining group due to: consumer pro-actively leaving the group
DEBUG o.apache.kafka.clients.NetworkClient Received LEAVE_GROUP response from node ... for request with header ...
DEBUG o.a.k.c.c.i.ConsumerCoordinator LeaveGroup response with ... returned successfully: ...

Then close their sessions:

Level Logger Message
DEBUG o.a.k.clients.FetchSessionHandler Set the metadata for next fetch request to close the existing session ...
DEBUG o.a.k.clients.FetchSessionHandler Built full fetch ... for node ...
DEBUG o.a.k.c.c.internals.AbstractFetch Sending READ_UNCOMMITTED ... to broker ...
DEBUG o.a.k.c.c.internals.AbstractFetch Adding pending request for node ...
DEBUG o.apache.kafka.clients.NetworkClient Sending FETCH request with header ...
DEBUG o.apache.kafka.clients.NetworkClient Received FETCH response from node 1 for request with header ...
DEBUG o.a.k.c.c.internals.AbstractFetch Successfully sent a close message for fetch session: ... to node: ...

And finally unregister themselves from the cluster before the application is shut down:

Level Logger Message
INFO o.a.kafka.common.utils.AppInfoParser App info kafka.consumer for ... unregistered
DEBUG o.a.k.clients.consumer.KafkaConsumer Kafka consumer has been closed
INFO io.micronaut.runtime.Micronaut Embedded Application shutting down

Most of these log messages are low-level, so you need to enable them explicitly if you want to read them:

<logger name="org.apache.kafka" level="ALL"/>

Everything seems to be working as expected, so I'm going to close this issue. Please feel free to raise a new one if it still doesn't work for you.

@guillermocalvo guillermocalvo added the closed: notabug The issue is not a bug label Aug 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
closed: notabug The issue is not a bug
Projects
No open projects
Status: Done
Development

No branches or pull requests

2 participants