Skip to content

kafka-4295: ConsoleConsumer does not delete the temporary group in zookeeper#2054

Closed
huxihx wants to merge 27 commits intoapache:trunkfrom
huxihx:kafka-4295_ConsoleConsumer_fail_to_remove_zknode_onexit
Closed

kafka-4295: ConsoleConsumer does not delete the temporary group in zookeeper#2054
huxihx wants to merge 27 commits intoapache:trunkfrom
huxihx:kafka-4295_ConsoleConsumer_fail_to_remove_zknode_onexit

Conversation

@huxihx
Copy link
Contributor

@huxihx huxihx commented Oct 22, 2016

Since consumer stop logic and zk node removal code are in separate threads, so when two threads execute in an interleaving manner, persistent node '/consumers/' might not be removed for those console consumer groups which do not specify "group.id". This will pollute Zookeeper with lots of inactive console consumer offset information.

…roup in zookeeper

Author: huxi

Since consumer stop logic and zk node removal code are in separate threads, so when two threads execute in an interleaving manner, persistent node '/consumers/<consumer-group>' might not be removed for those console consumer groups which do not specify "group.id". This will pollute Zookeeper with lots of inactive console consumer offset information.
@huxihx
Copy link
Contributor Author

huxihx commented Oct 24, 2016

@mjsax seems the failure is not relevant to the commit, how should I handle this situation? Please advice. thanks.

@huxihx huxihx closed this Oct 24, 2016
@guozhangwang
Copy link
Contributor

@amethystic Could you re-open the PR so that a Jenkins build can be triggered again? cc @hachikuji @ijuma for reviews.

@huxihx huxihx reopened this Oct 26, 2016
@huxihx
Copy link
Contributor Author

huxihx commented Oct 26, 2016

@hachikuji @ijuma please review this PR. Thanks.


// if we generated a random group id (as none specified explicitly) then avoid polluting zookeeper with persistent group data, this is a hack
if (!conf.groupIdPassed && conf.options.has(conf.zkConnectOpt))
ZkUtils.maybeDeletePath(conf.options.valueOf(conf.zkConnectOpt), "/consumers/" + conf.consumerProps.get("group.id"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have a utility in AdminUtils for this.

Copy link
Contributor Author

@huxihx huxihx Oct 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Yes, although AdminUtils already offers 'deleteConsumerGroupInZK' and 'deleteConsumerGroupInfoForTopicInZK' to implement this, I notice that ConsoleConsumer originally employ this snippet of code to delete inactive console consumer group.
The key point here is we must add this kind of code in addShutdownHook otherwise this thread might not delete the unused zk nodes if the first auto-commit task did not get started.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. It looks like Adminutils.deleteConsumerGroupInZK is the one we want. Perhaps we could replace both usages? Actually if we've added this to the shutdown hook, do we still need it in the finally?

Copy link
Contributor Author

@huxihx huxihx Oct 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The finally block already contains this cleanup call:
consumer.cleanup()
conf.formatter.close()
reportRecordCount()
// if we generated a random group id (as none specified explicitly) then avoid polluting zookeeper with persistent group data, this is a hack if (!conf.groupIdPassed) ZkUtils.maybeDeletePath(conf.options.valueOf(conf.zkConnectOpt), "/consumers/" + conf.consumerProps.get("group.id")) shutdownLatch.countDown()

ZkUtils.maybeDeletePath is idempotent, which can be called many times without any negative impact. I call it from the shutdown hook thread to make sure this thread will try to delete the zkNode after the main thread fails to do this.

As for the question whether we should use deleteConsumerGroupInZK to replace ZkUtils.maybeDeletePath, I recommend we use the original one since only adding two lines in the shutdown hook thread is enough to fix the bug, we do not have to do any other regression test to make sure the new deleteConsumerGroupInZK behaves as expected. But if you insist, I will close this PR and commit a new one with AdminUtils.deleteConsumerGroupInZK.
@hachikuji , what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, both methods delegate to ZkClient.deleteRecursive, and they both handle ZkNoNodeException, so I'm not sure I see the concern about idempotence. One difference is that maybeDeletePath catches a Throwable, but I don't see a good reason why we need to preserve that, especially since we're shutting down. Maybe I'm missing something?

Another question I have is about the shutdownLatch.await() below. If we're hitting a shutdown path where the maybeDeletePath is not being executed, wouldn't that mean we end up blocking in await()? Can you clarify the specific case that you're trying to handle in this patch?

Also, you don't need to close the PR if you want to change something. Just push a new commit. Your commits will get squashed when we merge anyway.

Copy link
Contributor Author

@huxihx huxihx Oct 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why zk node failed to be removed is not that 'maybeDeletePath' is not executed, but because delete(path) got failed in deleteRecursive where parent znode got failed to be deleted if there still existed some children znode before calling delete(path). So if ConsoleConsumer ran exactly long enough to let autocommit thread creates the znode /consumers/<consumer_group>/offsets, everything is good. But if not, when shutting down the consumer, it commits the offset in the interval between deleting children znodes(namely, ids and owners) and deleting the parent znode (/consumers/***). Do I make myself clear?

Besides, I have already tested the patch, with such scenarios:

  1. new consumer case: have to make sure there is no impact for the new consumer
  2. old short-running consumer: successfully deleted
  3. old long-running consumer: successfully deleted
  4. old short/long-running consumer with shut down by 'kill -9': Same behaviour as before, since if JVM process is shut down in this manner, event JVM shut down hook thread cannot be ensured to have a chance to run.
  5. old consumer with groupid specified: have no impact since we only want to clear up those inactive groups without setting groupid

@hachikuji Does it make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji any feedbacks about this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one question: do we need the additional call to maybeDeletePath in the finally block anymore? Adding it to the shutdown hook seems sufficient based on some testing. Is there any advantage to having it in both locations?

And just one note: it is actually possible to shutdown the console consumer before the shutdown hook gets registered, in which case you can still end up with the leftover node in Zk. You can try trigger this by shutting down the console consumer just after you see it register in zookeeper. This edge case is pretty tricky to address, so it's probably more important to get the main paths.

@huxihx
Copy link
Contributor Author

huxihx commented Nov 4, 2016

@joestein Could you help review this PR? Thanks.

@huxihx
Copy link
Contributor Author

huxihx commented Nov 25, 2016

@hachikuji Could you address the comments above? Thanks.

@hachikuji
Copy link
Contributor

@amethystic Apologies for the delay. I'll get to it today or tomorrow.

@huxihx
Copy link
Contributor Author

huxihx commented Dec 13, 2016

@hachikuji Any chances to address the comments these days? Please review the pull request. Thank you!

@asfbot
Copy link

asfbot commented Dec 13, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/97/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Dec 13, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/96/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Dec 13, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/95/
Test PASSed (JDK 7 and Scala 2.10).

amethystic added 4 commits December 22, 2016 14:03
1. Remove same cleanup code from within the JVM shutdown hook code block
2. Refine ZkUtils.maybeDeletePath to capture ZkException if to-be-delete znode is not empty
…okeeper

remove useless imports in ZkUtils.scala
@huxihx
Copy link
Contributor Author

huxihx commented Dec 22, 2016

@hachikuji yes, you are right. As per your comments, I removed some unnecessary code for resource cleanup. ZkUtils.maybeDelete was also refined to capture the ZkException when the to-be-delete path is not empty. The fix only applies for the ConsoleConsumer shutdown via sending SIGINT and SIGTERM. For "kill -9", seems we cannot guarantee it works since application cannot capture it. Do all these make senses?

@asfbot
Copy link

asfbot commented Dec 22, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/361/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Dec 22, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/362/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Dec 22, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/363/
Test PASSed (JDK 8 and Scala 2.11).

amethystic and others added 6 commits December 28, 2016 17:34
During Acceptor initialization, if "Address already in use" error is encountered,
the shutdown latch in each Processor is never counted down. Consequently,
the Kafka server hangs when `Processor.shutdown` is called.

Author: huxi <huxi@zhenrongbao.com>
Author: amethystic <huxi_2b@hotmail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2156 from amethystic/kafka-4428_Kafka_noexit_for_port_already_use
Some of the recent changes to `kafka-run-class.sh` have not been applied to `kafka-run-class.bat`.
These recent changes include setting proper streams or connect classpaths. So any streams or connect use case that leverages `kafka-run-class.bat` would fail with an error like
```
Error: Could not find or load main class org.apache.kafka.streams.???
```

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2238 from vahidhashemian/minor/sync_up_kafka-run-class.bat
Mx4jLoader.scala should explicitly `return true` if the class is successfully loaded and started, otherwise it will return false even if the class is loaded.

Author: Edward Ribeiro <edward.ribeiro@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2295 from eribeiro/mx4jloader-bug
The original Javadoc description for `ConsumerRecord` is slightly confusing in that it can be read in a way such that an object is a key value pair received from Kafka, but (only) consists of the metadata associated with the record. This PR makes it clearer that the metadata is _included_ with the record, and moves the comma so that the phrase "topic name and partition number" in the sentence is more closely associated with the phrase "from which the record is being received".

Author: LoneRifle <LoneRifle@users.noreply.github.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2290 from LoneRifle/patch-1
…ted regex

This makes it consistent with MirrorMaker with the old consumer.

Author: huxi <huxi@zhenrongbao.com>
Author: amethystic <huxi_2b@hotmail.com>

Reviewers: Vahid Hashemian <vahidhashemian@us.ibm.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2072 from amethystic/kafka-4351_Regex_behavior_change_for_new_consumer
…to be removed does not exist

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2218 from vahidhashemian/KAFKA-4480
@huxihx
Copy link
Contributor Author

huxihx commented Jan 1, 2017

@hachikuji please kindly take some time to review. Thanks.

Author: Himani Arora <1himani.arora@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #2297 from himani1/refactored_code
amethystic added 2 commits January 7, 2017 12:10
zookeeper

Addressed Ijuma's comments
1. Restored ZkUtils to trunk code
2. Restored ConsoleConsumerTest to trunk code
3. Restored ZkUtils.maybeDeletePath to trunk code
4. Replaced ZkUtils.maybeDeletePath with
AdminUtils.deleteConsumerGroupInZK
@asfbot
Copy link

asfbot commented Jan 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/602/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/601/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/600/
Test FAILed (JDK 7 and Scala 2.10).

@huxihx
Copy link
Contributor Author

huxihx commented Jan 7, 2017

@ijuma Please review the PR again. Thanks.

@asfbot
Copy link

asfbot commented Jan 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/610/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/608/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/609/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/611/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/609/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/610/
Test FAILed (JDK 8 and Scala 2.12).

@huxihx
Copy link
Contributor Author

huxihx commented Jan 12, 2017

@ijuma Please take time to review this PR. Thanks.

@asfbot
Copy link

asfbot commented Jan 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/788/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/786/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/786/
Test FAILed (JDK 8 and Scala 2.12).

@huxihx
Copy link
Contributor Author

huxihx commented Jan 16, 2017

@ijuma @hachikuji Please take some time to review the PR. Thanks.

@huxihx
Copy link
Contributor Author

huxihx commented Jan 20, 2017

@guozhangwang Do you know how to retest this fix since all the checks have failed although it is not known why it failed.

Copy link
Member

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that this PR now does the following:

  1. Fix issue where we tried to delete a path in ZK for the new consumer.
  2. Use AdminUtils.deleteConsumerGroupInZK instead of ZKUtils.maybeDeletePath

And the issue described in the JIRA remains since it's an edge case for the old consumer (that we intend to deprecate and remove)?

assertTrue("Consumer group should be created.", zkUtils.getChildren(ZkUtils.ConsumersPath).head == groupID)
} finally {
consumer.stop()
ConsoleConsumer.deleteZkPathForConsumerGroup(conf.options.valueOf(conf.zkConnectOpt), conf.consumerProps.getProperty("group.id"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the only thing this test is checking is that this call works. And since that is just calling AdminUtils, not sure if the benefit is worth it given the cost of starting up Kafka and ZK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but is there any way we could use to simulate sending terminate signals? This issue is caused by the fact ZkUtils.maybeDeletePath failed to delete the whole directory when ConsoleConsumer exits after receiving a INT or TERM signal , so what we can do is to test if the replaced method deleteConsumerGroupInZK works as expected. Any good idea to test this fix?

@huxihx
Copy link
Contributor Author

huxihx commented Jan 25, 2017

For your questions:
1: No, new consumer does not use ZK anymore. The fix is only for old consumer. Old consumer without group id specified leaves a lot of "garage" under /consumers due to the fact that ZKUtils.deletePath failed to delete them.
2: Yes. Tests showed that AdminUtils.deleteConsumerGroupInZK was able to clean up all the unused zk paths.

As what I said above, I am not sure if you guys still want to check in any code for fixing old consumer problems, especially when community is planning to remove it recently. It's up to you. If you don't, I am free to close the PR.
Does it make any sense?

@huxihx
Copy link
Contributor Author

huxihx commented Feb 3, 2017

@ijuma What's the status for this PR? Do I address all your comments already?

@asfbot
Copy link

asfbot commented Apr 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3141/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3137/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3137/
Test FAILed (JDK 7 and Scala 2.10).

@huxihx huxihx closed this May 31, 2017
@ijuma
Copy link
Member

ijuma commented Aug 4, 2017

@huxihx sorry for the delay. I was going to look at this PR, but it seems like it includes other changes now. Maybe you could simply revive the part where we fix the new consumer not to call ZK unnecessarily?

efeg pushed a commit to efeg/kafka that referenced this pull request May 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.