-
Notifications
You must be signed in to change notification settings - Fork 4.1k
STORM-2225: change spout config to be simpler. (1.x) #1868
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
This is the 1.x version of #1808 |
|
The test failures are unrelated and are around the integration tests that always seem to fail lately. |
| }; | ||
|
|
||
| protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig() { | ||
| return KafkaSpoutConfig.builder("127.0.0.1:9092", TOPIC_1, TOPIC_2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we using a hardcoded port here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an exact translation of the original code. Even down to not using KAFKA_LOCAL_BROKER. If people want me to change it I am happy to, but I thought it best to not overreach on the scope of the pull request. At least until the code worked.
|
|
||
| @Override | ||
| public String getTopic(Tuple tuple) { | ||
| if (fieldIndex < tuple.size()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Protect against negative index?
| @@ -1,192 +1,5 @@ | |||
| #Storm Kafka Spout with New Kafka Consumer API | |||
| #Storm Kafka integration using the kafka-client jar | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we call it as Kafka New consumer API. Not many understand its using newer api by calling it as "kafka-client" jar
| public KafkaSpoutTuplesBuilderWildcardTopics(KafkaSpoutTupleBuilder<K, V> tupleBuilder) { | ||
| this.tupleBuilder = tupleBuilder; | ||
| } | ||
| public class StormStringDeserializer extends StringDeserializer implements SerializableDeserializer<String> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this? users can configure the StringSerializer from kafka-clients lib
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we need this even more now. The Kafka Deserializer (including StringDeserializer) is not java serializable. So if we don't do this on a real storm cluster we will get exceptions when we try to write out the spout.
I can look into trying to support some kind of generics like
public <NK> Builder<NK,V> setKey(Class<? extends Deserializer<NK>> keyDeserializer) {
But I really don't know if that works. I'll try it out and let you know.
|
@revans2 overall looks good to me. Minor nits. |
|
+1, Non-binding. |
|
Thanks, @revans2. Still +1, NB. |
17a2e6c to
094d6dc
Compare
|
+1. @revans2 can you squash the commits. |
STORM-2225: change spout config to be simpler. STORM-2228: removed ability to request a single topic go to multiple streams STORM-2236: Reimplemented manual partition management on top of STORM-2225
094d6dc to
a3e6f60
Compare
|
This branch is also in sync with #1808 now tabs are removed and code is squashed. |
|
+1 |
|
@revans2 |
|
It was two jdk8 annotations that I didn't remove as part of rework/upmerging. I'll get them |
|
@HeartSaVioR I removed the unneeded annotation and rebuilt with JDK7 |
|
+1 |
No description provided.