-
Notifications
You must be signed in to change notification settings - Fork 4.1k
STORM-2541: Fix storm-kafka-client manual subscription not being able… #2150
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
hmcl
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| currentAssignment = newAssignment; | ||
| consumer.assign(currentAssignment); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srdo lines 62 and 63 should be switched. We want to call the listener only once all is set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix
| import org.apache.kafka.clients.consumer.KafkaConsumer; | ||
| import org.apache.kafka.common.TopicPartition; | ||
|
|
||
| public interface TopicFilter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
must implement Serializable. Otherwise code didn't work for me. The trident changes are already do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix
|
|
||
| @Override | ||
| public String getTopicsString() { | ||
| return pattern.pattern(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation should be for a method called getTopicsPattern. Please make a diff with my slight modification of your changes. I suggest that you incorporate that diff in this patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if this works? storm-kafka-monitor seems to be getting the subscribed topics out of the getComponentConfiguration method, but is that called more than once? If it isn't being called occasionally then the list this method returns is going to be empty for pattern subscriptions.
edit: What I'm getting at is that the pattern based topic string should be returning all existing topics that match the pattern in order to work with storm-kafka-monitor, but that list isn't known until the spout is running and has requested its subscription. I'm not sure the configuration storm-kafka-monitor is looking at is updated after the topology is built.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@priyank5485 can you answer the storm-kafka-monitor question here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srdo Isn't the kafka monitor a runtime thing? If so, it won't have any values until the topology is running, right ? I believe that the approach I am suggesting in the Trident PR will solve what both you and I have in mind. Please take a look at the following:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hmcl I went and tried out storm-kafka-monitor from the #2174 branch, and ran into multiple issues.
- The PatternTopicFilter code throws an NPE because the spout calls getTopicString before the Set is initialized (the method is used for logging). This prevents the topology from being submitted.
- It looks like the Kafka consumer code is not being correctly shaded into the storm-kafka-monitor jar, so the bash script didn't work for me. I'm not sure if this was a quirk of my build, since the consumer code is present in the 1.1.0 release storm-kafka-monitor jar.
- After fixing the two other issues, I tried submitting a topology consuming the "test*" pattern from a Kafka instance where topics "test", "test-1" and "test-2" were present. The Storm UI offset lag request returns the following, no matter how long I wait after the topology is running
{"kafka-spout":{"spoutId":"kafka-spout","errorInfo":"Unable to get offset lags for kafka. Reason: org.apache.kafka.shaded.common.errors.InvalidTopicException: Topic '' is invalid\n","spoutType":"KAFKA"}}
That code (
| commands.add((String)jsonConf.get(configKeyPrefix + "topics")); |
| Map<String, Object> conf = component.getComponentConfiguration(); |
I'd also add that I have a few other issues with storm-kafka-monitor:
- The jar is installed along with the cluster, and depends on the Kafka version specified in Storm's root POM. Kafka guarantees backwards compatible client-server communication for one release only, so there's a potential coupling between Storm cluster version and Kafka version. If users want to update the Kafka version in storm-kafka-monitor, they have to rebuild that module and replace the jar in their Storm install.
- The UI integration uses the storm-kafka-monitor Bash script to start the monitoring code, in order to avoid a dependency between storm-core and storm-kafka-monitor. This prevents the UI integration from working on Windows. We could supply a Windows script as well, but then we'd need to keep the two in sync.
I think it makes sense to make the change you are suggesting simply because it makes the spout log a little nicer, but it still leaves the issue with storm-kafka-monitor not working for pattern subscriptions. It would be good if we could figure out a solution to the other issues as well. Maybe we could track offset lag through the metrics system instead, and just make the spout update it?
I'll create a separate issue for discussing storm-kafka-monitor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hmcl Sure, I don't see a reason not to have a Pattern getter
|
Created the issue for storm-kafka-monitor here https://issues.apache.org/jira/browse/STORM-2600 |
|
@srdo @harshach Some comments related to storm-kafka-monitor [~Srdo] Regarding first point, because the lag request is a http pull request from UI, as long as kafkaSpoutConfig.getSubscription().getTopicsString(); returns the correct value it will work since the open method would have been called eventually. The only change we would need is that when the rebalance listener is invoked we would need to keep track of the topics currently subscribed. For e.g. PatternSubscription can have an instance variable called Set topics that would be correctly updated anytime onPartitionsRevoked or onPartitionsAssigned is called. We can use that instance variable to return the value when getTopicsString is called on that object. Does that work? Regarding point 2, we could move the kafka client version to storm-kafka-monitor module. Can you elaborate a bit more on your concern? Is it that if kafka cluster is upgraded, storm-kafka-monitor wont work? In that case storm-kafka-client module will also have to be updated and topology jar rebuilt. Correct? I think in general, we have a certain compatibility restriction in that a given storm release works or has been tested with a certain version of kafka clients. Correct me if i am wrong. Regarding 3, the main reason for creating a separate module and calling bash script from storm UI is so that storm-core does not have a direct kafka dependency since that made sense. For windows, cygwin can be a workaround. Plus, i dont know how many windows users we have. We can start a thread to see if there is really a user base for which we need to support a use case. I dont know the details about how metrics would work. We could have opinions from other people. If that is indeed the right way to go about it then I am all for changing it. |
|
@priyank5485 Thanks for responding, I replied on the linked issue |
|
The issues raised in https://issues.apache.org/jira/browse/STORM-2600 aren't really related to this PR, since the pattern based subscriptions were already unsupported by storm-kafka-monitor prior to this change. I think we should look at those issues separately. |
|
+1 |
92e963e to
21e7e30
Compare
|
@srdo I am +1 on this PR. Let's just add the getter method as we agreed on this comment, and have this PR consist of two commits only. The commit in STORM-2548 PR and its own commit. |
|
@srdo Thanks for your diligence and awesome work refactoring this code. It just made it much better. |
310683f to
eefcf89
Compare
|
@hmcl Sorry, I accidentally overwrote this PR with the STORM-2548 content. This shouldn't contain the STORM-2548 changes. I added the getter and fixed the content error. |
|
+1. |
| List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer); | ||
|
|
||
| /** | ||
| * @return A human-readable string representing the subscribed topics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: representing the topics that pass the filter
a360537 to
a25e83a
Compare
… to start consuming
… to start consuming
I also refactored the manual subscriptions a bit to deduplicate the code. I think there's no real issue with backporting this to 1.x, because clearly no one has been using these subscriptions so the API breakage shouldn't bother anyone.
With regard to testing the manual subscription, I'd like to do that in https://issues.apache.org/jira/browse/STORM-2542 instead, since it'll make the current tests use the manual subscription.