-
Notifications
You must be signed in to change notification settings - Fork 4.1k
STORM-2691: Make storm-kafka-client implement the Trident interface correctly #2300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
01328ca to
da7b40f
Compare
|
Rebased to fix conflicts. |
…rident spout interface properly
|
+1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for revisiting too late. I intended to wait for how the discussion went between #2174 and this, and lost tracking on the issue. We are keeping it broken for months: let's not keep it broken just because of dispute for credit. I wish we don't do it again: we are "contributing" our efforts to the project like a donation, so think about the project first, then credit.
+1
@srdo
Looks like this relies on default (actually the code was) so I guess separate patch for 1.x is needed. If then could you craft a patch for 1.x? Thanks in advance!
|
Thanks for the reviews. @HeartSaVioR Sorry, @hmcl and I talked offline a while ago. It's my impression that we're good. I'm not planning to port this to 1.x, since it requires breaking changes. I can't think of a way to fix this without replacing the Subscription interface. |
|
@srdo Regarding breaking backward compatibility, if it is needed to fix "broken" thing, we would want to do that instead of leaving it as broken. Let's raise discussion thread and see there's objection about breaking it. We could get it for 2.0.0 first, and after discussion we could get it for 1.x as well. |
This PR is based on #2271, so please ignore the first commit. The broad thrust of this PR is to fix the storm-kafka-client Trident spout so it implements the Trident API with fewer caveats. The current implementation takes some shortcuts because it used to be necessary to support use of the KafkaConsumer.subscribe API, but it causes some issues listed below.
The changes in STORM-2407: KafkaTridentSpoutOpaque Doesn't Poll Data From All Topic-… #2009 released in 1.1.0 made some changes to the OpaquePartitionedTridentSpoutExecutor that likely broke IOpaquePartitionedTridentSpout implementations other than storm-kafka-client. The changed code used to request sorted partitions from the spout via getOrderedPartitions, do a round-robin partitioning, and assign partitions via refreshPartitions https://github.com/apache/storm/blob/v1.0.4/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L100. The new code just passes the output of getOrderedPartitions into refreshPartitions https://github.com/apache/storm/blob/v1.1.0/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L120. It looks to me like refreshPartitions is passed the list of all partitions assigned to any spout task, rather than just the partitions assigned to the current task.
The proposed fix will use getOrderedPartitions to get the sorted partitions list, pass the list into getPartitionsForTask, and pass the resulting list of assigned partitions back into refreshPartitions.
The current implementation of the Trident Kafka spout only has one consumer in the Emitter. The Coordinator needs to know which partitions are involved in a batch, and this information is shared via a static field the Emitter writes to. The Coordinator and Emitter are both running as regular bolts, so there's no guarantee that they're in the same JVM.
The Subscription interface doesn't fit Trident very well. Trident assumes that there is a coordinator bolt that will determine which partitions should be involved in a batch, and the emitter bolts should divide the partitions between them. Each emitter should then only emit for the partitions it is assigned. The Subscription interface only has a
subscribe()method that does everything, which means we can't delegate parts of the subscription process to the coordinator.The current spout does a workaround that assumes that the coordinator bolt will be in the same worker as one or more emitters, and just makes the emitter handle the whole subscription process, and put the partition information in a static field. The coordinator then reads this information from the same field. This only works if the coordinator happens to be sharing a worker with the emitters, and causes issues where the coordinator emits a set of partitions, and the emitters happen to do a reassignment before the message arrives.
The suggested fix is to split the subscription process into 4 steps that fit what Trident needs: Get all partitions for the spout tasks, sort the partitions, decide which partitions belong to this task and do the assignment. This allows us to get rid of the shared static field and split the subscription process across the coordinator and emitters. We'll lose a bit of flexibility in implementing Subscriptions, since we're now locking implementations to follow these steps, but I don't think it's really any kind of loss.
I don't think we can port this to 1.x without breaking the public API for the spout. I'm leaning toward not backporting this and leaving it only in 2.x, to respect semantic versioning.
Other small changes: