Skip to content

Conversation

@srdo
Copy link
Contributor

@srdo srdo commented Sep 20, 2016

…tion reassignment in new Kafka spout

See https://issues.apache.org/jira/browse/STORM-2104

In order to test this change I added a factory for KafkaConsumers. Please let me know if there's a nicer way to mock it.

In addition to fixing the described issue, I changed a few types on KafkaSpoutConfig. If the user specifies a non-serializable Deserializer in either setter in KafkaSpoutConfig.Builder, the topology can't start because Nimbus can't serialize KafkaSpoutConfig.

I borrowed a few classes from #1679. I hope that's okay with you @jfenc91.

@srdo
Copy link
Contributor Author

srdo commented Sep 21, 2016

The test failure is due to maven failing to download dependencies on storm-core. !storm-core passed.


//Emitted messages for partitions that are no longer assigned to this spout can't be acked, and they shouldn't be retried. Remove them from emitted.
Set<TopicPartition> partitionsSet = new HashSet(partitions);
emitted.removeIf((msgId) -> !partitionsSet.contains(msgId.getTopicPartition()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good. I think this same logic may be needed in onPartitionsRevoked as well. Also, I believe the message may need to be removed from the retryService as well. Please correct me if I am wrong!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The messages should be getting removed from retryService in line 156. It's my impression that onPartitionsAssigned will be getting called immediately after onPartitionsRevoked, before the current call to poll returns (see https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html).

/**
* @param <T> The type this deserializer deserializes to.
*/
public interface SerializableDeserializer<T> extends Deserializer<T>, Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this wrapper marking interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it was nice to have, since setKey/ValueDeserializer in the builder implicitly requires the deserializer to be serializable. For example, if you try to set the standard Kafka StringDeserializer via those methods, you'll get a NotSerializableException when the topology is submitted to Storm, since they'll be set as fields on the final KafkaSpoutConfig field in KafkaSpout.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think I am going to need to fix that on my patch.

@hmcl
Copy link
Contributor

hmcl commented Sep 22, 2016

@srdo @jfenc91 I am on vacation this week (with limited access to Internet) and I will be back on Monday. Can we please holding on merging this until I can finish my review. I implemented the original patch, and would like to review these changes. Thanks.

@srdo
Copy link
Contributor Author

srdo commented Sep 22, 2016

@hmcl Sure thing.

@srdo
Copy link
Contributor Author

srdo commented Oct 17, 2016

@hmcl ping. Had a chance to look at this? It would be nice to get merged soon.

@srdo
Copy link
Contributor Author

srdo commented Nov 3, 2016

@hmcl @revans2 @knusbaum Could you guys take a look at this at some point? I noticed that two new PRs (#1752, #1753) were opened to solve the same problem, and this one has been kind of radio silent for a while.

@srdo
Copy link
Contributor Author

srdo commented Dec 7, 2016

@revans2 ping for review if you have time. I'd like to get this in before too long if possible.

@revans2
Copy link
Contributor

revans2 commented Dec 8, 2016

This looks good to me. Now that I have gone through the kafka spout code for my other pull request I am confident in giving this a +1.

@asfgit asfgit merged commit 29b52e9 into apache:master Dec 8, 2016
@srdo srdo mentioned this pull request Dec 8, 2016
@qiozas
Copy link

qiozas commented Feb 27, 2017

Do you plan to port this to 1.0.x branch too?
We are facing the same issue on 1.0.3 release (upgrade from 0.9.6).

@srdo
Copy link
Contributor Author

srdo commented Feb 27, 2017

@qiozas If you really need it on 1.0.x, then I wouldn't mind porting it. It seems like 1.1.0 is right around the corner though (RCs are being tested), so it might be faster for you to upgrade when that comes out, since a backport would have to wait for another 1.0.x release?

@qiozas
Copy link

qiozas commented Feb 28, 2017

@srdo thank you very much for your offer.

We have performed many tests with 1.0.2/3 release in order to be sure for any problems in migration from 0.9.6 to new version. Our major problem is the current one.

Additionally, Storm 1.1.0 will be the first release with Kafka 0.10 API, so I am not very confident to use it in a production system. We prefer to use Kafka 0.10 API, but it is too new in Storm world (actually Storm hold us back to Kafka 0.9 API, but we can live for some time).
If it is not big trouble, it will be very helpful if you merge this fix to 1.0.x branch (afterwards we can use it in our code too).

@srdo
Copy link
Contributor Author

srdo commented Feb 28, 2017

@qiozas The Kafka 0.10 API changes were more or less a one-liner for the spout if I recall, so it shouldn't be a big risk to update. 0.9 and 0.10 have the same API.

I'll take a look at backporting this soon.

@srdo srdo mentioned this pull request Feb 28, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants