KAFKA-14601: Improve exception handling in KafkaEventQueue#13089
KAFKA-14601: Improve exception handling in KafkaEventQueue#13089cmccabe merged 1 commit intoapache:trunkfrom
Conversation
b482a08 to
cdc60cd
Compare
mumrah
left a comment
There was a problem hiding this comment.
Thanks for the patch @cmccabe. Moving the cleanupEvent to the constructor and making it final makes a lot of sense and helps make the code easier to follow.
One question I had is, do we see interrupts in production environments, or just on Jenkins? I would expect KafkaEventQueue to normally get shutdown cleanly via the normal shutdown routines, which (AFAIK) don't use interrupts.
There was a problem hiding this comment.
Why the supplier here for ShutdownEvent? It looks like KafkaEventQueue just takes in the event object
There was a problem hiding this comment.
Can we update the javadoc of run to include the new param and the meaning of the return value
There was a problem hiding this comment.
One thing I've run into before is exceptions in Event#run getting swallowed by the default handleException implementation. Is it worth adding logging for any exception thrown in Event#run?
There was a problem hiding this comment.
OK, but it needs to be at DEBUG level (since usually we don't want this, since it should be logged in handleException.)
There was a problem hiding this comment.
Was this file added intentionally? If so, we should probably decrease the level here to WARN to avoid spamming Jenkins with test logging
There was a problem hiding this comment.
INFO is traditional, let's go with that
Good question. In general our normal shutdown process doesn't use InterruptedException. (At least for the classes that use KafkaEventQueue -- maybe there is someone somewhere using it....) So this is mainly for making Jenkins work smoothly. |
* apache-github/trunk: KAFKA-14601: Improve exception handling in KafkaEventQueue apache#13089 KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface (apache#12886) KAFKA-14530: Check state updater more often (apache#13017) KAFKA-14304 Use boolean for ZK migrating brokers in RPC/record (apache#13103) KAFKA-14003 Kafka Streams JUnit4 to JUnit5 migration part 2 (apache#12301) KAFKA-14607: Move Scheduler/KafkaScheduler to server-common (apache#13092) KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface (apache#12870) KAFKA-14557; Lock metadata log dir (apache#13058) MINOR: Implement toString method for TopicAssignment and PartitionAssignment (apache#13101) KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (apache#11818) KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer (apache#13032) KAFKA-14600: Reduce flakiness in ProducerIdExpirationTest (apache#13087) KAFKA-14279: Add 3.3.x streams system tests (apache#13077) MINOR: bump streams quickstart pom versions and add to list in gradle.properties (apache#13064) MINOR: Update KRaft cluster upgrade documentation for 3.4 (apache#13063) KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller. (apache#12998) KAFKA-14570: Fix parenthesis in verifyFullFetchResponsePartitions output (apache#13072) MINOR: Remove public mutable fields from ProducerAppendInfo (apache#13091)
…master * apache-github/trunk: (23 commits) MINOR: Include the inner exception stack trace when re-throwing an exception (apache#12229) MINOR: Fix docs to state that sendfile implemented in `TransferableRecords` instead of `MessageSet` (apache#13109) Update ProducerConfig.java (apache#13115) KAFKA-14618; Fix off by one error in snapshot id (apache#13108) KAFKA-13709 (follow-up): Avoid mention of 'exactly-once delivery' or 'delivery guarantees' in Connect (apache#13106) KAFKA-14367; Add `TxnOffsetCommit` to the new `GroupCoordinator` interface (apache#12901) KAFKA-14568: Move FetchDataInfo and related to storage module (apache#13085) KAFKA-14612: Make sure to write a new topics ConfigRecords to metadata log iff the topic is created (apache#13104) KAFKA-14601: Improve exception handling in KafkaEventQueue apache#13089 KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface (apache#12886) KAFKA-14530: Check state updater more often (apache#13017) KAFKA-14304 Use boolean for ZK migrating brokers in RPC/record (apache#13103) KAFKA-14003 Kafka Streams JUnit4 to JUnit5 migration part 2 (apache#12301) KAFKA-14607: Move Scheduler/KafkaScheduler to server-common (apache#13092) KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface (apache#12870) KAFKA-14557; Lock metadata log dir (apache#13058) MINOR: Implement toString method for TopicAssignment and PartitionAssignment (apache#13101) KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (apache#11818) KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer (apache#13032) KAFKA-14600: Reduce flakiness in ProducerIdExpirationTest (apache#13087) ...
If KafkaEventQueue gets an InterruptedException while waiting for a condition variable, it currently exits immediately. Instead, it should complete the remaining events exceptionally and then execute the cleanup event. This will allow us to finish any necessary cleanup steps. In order to do this, we require the cleanup event to be provided when the queue is contructed, rather than when it's being shut down. Also, handle cases where Event#handleException itself throws an exception. Remove timed shutdown from the event queue code since nobody was using it, and it adds complexity. Add server-common/src/test/resources/test/log4j.properties since this gradle module somehow avoided having a test log4j.properties up to this point. Reviewers: David Arthur <mumrah@gmail.com>
If KafkaEventQueue gets an InterruptedException while waiting for a condition variable, it
currently exits immediately. Instead, it should complete the remaining events exceptionally and
then execute the cleanup event. This will allow us to finish any necessary cleanup steps.
In order to do this, we require the cleanup event to be provided when the queue is contructed,
rather than when it's being shut down.
Also, handle cases where Event#handleException itself throws an exception.
Remove timed shutdown from the event queue code since nobody was using it, and it adds complexity.
Add server-common/src/test/resources/test/log4j.properties since this gradle module somehow avoided
having a test log4j.properties up to this point.