-
Notifications
You must be signed in to change notification settings - Fork 4.1k
STORM-2538: New kafka spout emits duplicate tuples #2147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
hmcl
commented
May 31, 2017
- Refactored TimeInterval out of KafkaSpoutRetryExponentialBackoff
- Configurable pre-fetch tasks to be run before spout starts processing tuples
- Configurable fixed delay task before activation
- Consumer rebalance task not to poll any data during rebalance
- Refactored TimeInterval out of KafkaSpoutRetryExponentialBackoff
- Configurable pre-fetch tasks to be run before spout starts processing tuples
- Configurable fixed delay task before activation
- Consumer rebalance task not to poll any data during rebalance
|
This patch is WIP, needs fixing unit test failures. Submitting for initial review. |
arunmahadevan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe if you can separate out the changes into two parts.
- Adding a configurable delay before the initial startup
- Reducing the duplicates during a rebalance
I think 1 is a less disruptive change and can be addressed first, 2 may need some more work to eliminate duplicates due to offsets for in-flight messages not committed.
| private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure | ||
| private transient Timer commitTimer; // timer == null for auto commit mode | ||
| private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process. | ||
| // private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove if not required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I forgot.
| if (!consumerAutoCommitMode && initialized) { | ||
| initialized = false; | ||
|
|
||
| if (!consumerAutoCommitMode && task.isComplete()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where/how is the task going to complete?
need some latch/check in a loop here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a boolean, asynchronous task. start() and stop() will set a boolean field. start() -> complete=true, stop() -> complete=false
| } | ||
| initialized = true; | ||
|
|
||
| task.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is invoked from onPartitionsAssigned, the isComplete is going to return true as soon as onPartitionsRevoked is invoked after partition assignment right ?
The onPartitionsRevoked could wait for message.timeout.secs so that in-flight messages are acked/failed and then commit the offsets before returning to minimize the duplicates. May be this is what you wanted the ConsumerRebalanceHandlerTask to handle via isCompleted but it does not seem to do that currently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I recall the KafkaConsumer won't let you commit messages to partitions the consumer is not assigned, so committing after onPartitionsRevoked won't work. I think it would be a nicer solution to eliminate most rebalances by switching to manual partition assignment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe @srdo says is true. Once a rebalance happens, I don't think commits can be done to kafka. The purpose of the onPartitionsRevoked callback is to handle persisting offsets in a separate storage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears that offsets can be committed to either kafka or externally in onPartitionsRevoked. So the spout can wait to receive the acks for in-flight messages before committing the offsets to reduce duplicates.
I think it would be a nicer solution to eliminate most rebalances by switching to manual partition assignment
Yes, switching to manual assignment where each task gets a fixed subset of partitions would eliminate the re-balance completely. If we can get that out quickly well and good, but I think it may require a re-design of the current logic, meanwhile this patch can put some band-aid on whatever already exists in production.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh, you're right. Learn something every day :)
I don't think it can work though. The onPartitionsRevoked method is executed in the thread that calls KafkaConsumer.poll (i.e. the executor thread). The thread can't both be waiting in that method and acking tuples.
We already have manual assignment implementations ready to go here and here, the user just has to configure the spout to use them with one of these builders https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L136
| //Accounting for retriable tuples this way still guarantees that the limit is followed on a per partition basis, and prevents locking up the spout when there are too many retriable tuples | ||
| (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets || | ||
| consumerAutoCommitMode); | ||
| final boolean poll = !isWaitingToEmit() && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understood correctly, it can be refactored to something like :
boolean result = false;
if (!isWaitingToEmit()) {
if (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets || consumerAutoCommitMode) {
result = true;
} else {
LOG.debug("Not polling. [{}] uncommitted offsets across all topic partitions has reached the threshold of [{}]", numUncommittedOffsets, maxUncommittedOffsets);
}
} else {
LOG.debug("Not polling. Tuples waiting to be emitted. [{}] uncommitted offsets across all topic partitions", numUncommittedOffsets);
}
return result;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was avoiding refactoring internal logic in these changes, but I will look at it and do it either in this or in a follow up patch. I will let u know.
| * Accounting for retriable tuples this way still guarantees that the limit is followed on a per partition basis, | ||
| * and prevents locking up the spout when there are too many retriable tuples | ||
| */ | ||
| (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets || consumerAutoCommitMode); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add clarifying parenthesis
if (((numUncommittedOffsets - readyMessageCount) < maxUncommittedOffsets) || consumerAutoCommitMode) There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do, but this was pre-existing code, not really pertaining to the scope of this patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some other comments as well, but I'm not really a fan of this solution. Delaying for a while before the spout starts emitting tuples seems like an unreliable way to fix this.
I think I might've mentioned offhand on other PRs that we should consider making the manual partition assignment strategy the default and I'd like to raise it again as a potential solution to this.
The subscribe API doesn't really seem to add any value for us. We don't need automatic reassignment when a worker goes down, because Storm will ensure it is restarted, and having Kafka reassign partitions has some drawbacks I think we'd rather be without. Namely that all the KafkaSpouts will have to spend time rebalancing whenever any executor crashes, that the subscribe API behavior is unpredictable and surprising to users who are used to storm-kafka's behavior (https://issues.apache.org/jira/browse/STORM-2514), and now this issue.
Is there a good reason we want to keep subscribe as the default behavior, or even want to support that API?
Edit: And also using the subscribe API is incompatible with multiple tasks per executor, as detailed in the comments of STORM-2514
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.util.Collection; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think checkstyle is configured to require first static imports, then all other imports, with alphabetical sorting within each group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. IntelliJ did this for me.
| * @return the Storm config for the topology that publishes sentences to kafka using a kafka bolt. | ||
| */ | ||
| private static Properties newProps(final String brokerUrl, final String topicName) { | ||
| static Properties newProps(final String brokerUrl, final String topicName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see this called anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use this for some integration test cases I use that are not part of the codebase. This is not a harmful change as it's left as package protected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, was just making sure it wasn't an oversight :)
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.storm.kafka.spout.internal; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this go in the main package instead? I don't think it makes sense to have an "internal" package if we put user-facing classes in there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, but the issue I have is that the main package already has a large number of classes, some of then around supporting only defaults. It's getting hard to track what's happening where. Maybe we should create some sub-packages.
|
@srdo I agree that this is not the ideal solution, and that's precisely the reason why the pre-fetch tasks are pluggable. There are better approaches to do this, but this one fixes some immediate problems for some customers. The discussion of whether supporting dynamic partition assignment or only fixed partition should be done in the email list. I think that we should still support it, at least for 1.x-branch as there are customers using it already. I would conjecture that way more people are using dynamic vs fixed. For 2.0 is a matter of deciding what we want to do, and if we end up supporting both, we should really separate concerns handling fixed and dynamic partition assignment. |
|
@hmcl I think it's fine to fix this if users have a problem that can't be fixed without a code change, but I'm wondering if having them switch to manual assignment wouldn't work? I think it would be worth checking before we start making this type of workaround. As far as I know it should be fairly easy to try out. You're very likely right that most people are using the Just to clarify as well, the manual partition assignment is still dynamic (in the sense that it'll update to account for new partitions), it's just being done by the spout code rather than Kafka. I'll start a thread on this on the dev list. |
|
@srdo @harshach @arunmahadevan in face of the Manual Partition assignment related fixes, there are two options. I believe in the email list we agreed to go with option #1, but I would like to make sure. 1 - We disregard and close this PR. If we do so, we are implicitly telling the user that if they decide to use dynamic partition assignment they are at their own responsibility and it won't work with multiple parallelism. Currently the only implementation we support is |
|
@hmcl I am strongly in favor of option 1, since no one has provided a use case for/argued in favor of the subscribe API based partition assignment code. Ideally we'll deprecate (1.x) and remove (2.x) subscribe API support as well, rather than leaving it as an unsupported option. |
|
Closed in favor of #2151 |