-
Notifications
You must be signed in to change notification settings - Fork 4.1k
STORM-2225: change spout config to be simpler. #1808
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I updated the docs and the examples. This is a non-backwards compatible change from the 1.x release. I have some code (not included here) that can maintain most compatibility if we really want to, but because of STORM-2228 I decided it would be better to just not support that at all. If we want to add that compatibility for 1.x but deprecated I am OK with that, but would like feedback on it before putting up some kind of a pull request. |
|
I see some duplicate code and documentation. Is the idea to move from /external/storm-kafka to /external/storm-kafka-client? |
docs/storm-kafka-client.md
Outdated
| #Storm Kafka integration using the kafka-client jar | ||
|
|
||
| Apache Storm Spout implementation to consume data from Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version Compatibility] (#compatibility)). | ||
| ##Compatability |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: Compatibility is misspelled
docs/storm-kafka-client.md
Outdated
| use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility | ||
| reasons. Alternatively you could also specify a different key and message field by using the non default constructor. | ||
| In the TridentKafkaState you must specify what is the field name for key and message as there is no default constructor. | ||
| These should be specified while constructing and instance of FieldNameBasedTupleToKafkaMapper. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: and -> an
docs/storm-kafka-client.md
Outdated
| The implementation of this interface should return the topic to which the tuple's key/message mapping needs to be published | ||
| You can return a null and the message will be ignored. If you have one static topic name then you can use | ||
| DefaultTopicSelector.java and set the name of the topic in the constructor. | ||
| `FieldNameTopicSelector` and `FieldIndexTopicSelector` use to support decided which topic should to push message from tuple. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Phrasing is a little awkward here. Maybe something like "... can be used to decide which topic a message should go to based on a field in the tuple" instead?
docs/storm-kafka-client.md
Outdated
| DefaultTopicSelector.java and set the name of the topic in the constructor. | ||
| `FieldNameTopicSelector` and `FieldIndexTopicSelector` use to support decided which topic should to push message from tuple. | ||
| User could specify the field name or field index in tuple ,selector will use this value as topic name which to publish message. | ||
| When the topic name not found , `KafkaBolt` will write messages into default topic . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"not found" -> "is not found"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think it would be clearer if this referred to Field*TopicSelector instead of KafkaBolt, I misunderstood it on first read.
docs/storm-kafka-client.md
Outdated
| `FieldNameTopicSelector` and `FieldIndexTopicSelector` use to support decided which topic should to push message from tuple. | ||
| User could specify the field name or field index in tuple ,selector will use this value as topic name which to publish message. | ||
| When the topic name not found , `KafkaBolt` will write messages into default topic . | ||
| Please make sure the default topic have created . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have -> has been
|
|
||
| private Collection<TopicPartition> toArrayList(final TopicPartition tp) { | ||
| return new ArrayList<TopicPartition>(1){{add(tp);}}; | ||
| ArrayList<TopicPartition> ret = new ArrayList<>(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Unless you need to modify the return value later, you can just return Collections.singleton(tp)
|
|
||
| private Collection<TopicPartition> toArrayList(final TopicPartition tp) { | ||
| return new ArrayList<TopicPartition>(1){{add(tp);}}; | ||
| ArrayList<TopicPartition> ret = new ArrayList<>(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Same as in KafkaSpout
| fields = translator.getFieldsFor(stream); | ||
| } else { | ||
| if (!fields.equals(translator.getFieldsFor(stream))) { | ||
| throw new IllegalArgumentException("Trident Spouts do nut support multiple output Fields"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: nut -> not. Also maybe rephrase as "must have the same fields for all streams"
| return null; | ||
| } | ||
| }); | ||
| KafkaBolt<String, String> bolt = new KafkaBolt<String, String>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Can leave off the contents of <> on the right hand side in a few places here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case because I am subclassing KafkaBolt it the compiler(at least in eclipse) actually complains that I am not allowed to do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, can't do that. Sorry :)
| } | ||
|
|
||
| @Test(expected = IllegalArgumentException.class) | ||
| public void testFiledCollision() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filed -> field
|
I like the new configuration design. Backward compatibility is not an issue for us. I'm wondering if enough people have even switched to the new spout yet to make backward compatibility for 1.x a must. |
|
@srdo for me backwards compatibility for 1.x is more a question of violating out versioning than anything else. |
|
Sure, that makes sense. Maybe storm-kafka-client should have been marked unstable since it's fairly new, it's still in a phase where the kinks are being worked out, even if the version number says differently. |
|
@srdo I think I addressed all of your review comments. |
|
I think my only remaining nit is the spelling error in KafkaBolt. Thanks @revans2. I am +1 on this. |
|
@revans2 I would like to take a look at this before this gets merged in. |
|
@srdo as part of backporting this to 1.x I am going to need to make a change to not use Function directly, because it is only in java 8. So to maintain compatibility between 1.x and 2.x I am going to need to make a few changes in this patch too. |
|
|
||
| ## Committer Sponsors | ||
| * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org)) | ||
| Please see [here](../../docs/storm-kafka-client.md) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still need to keep the description because this file will be included to binary dist whereas docs is not.
|
I think I have addressed all of the review comments so far. I will try to get my 1.x version of the patch up shortly. |
| #Storm Kafka integration using the kafka-client jar | ||
|
|
||
| Apache Storm Spout implementation to consume data from Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version Compatibility] (#compatibility)). | ||
| Spouts and Bolts used to interact with Kafka thought the kafka-client library. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: "Thought" should be "through"
|
All outstanding review comments should be done now. This and the 1.x port at #1868 should be ready for a final pass and hopefully being merged in. |
|
The test failures are unrelated and are around the integration tests that always seem to fail lately. |
|
@srdo @harshach sorry to do this to you, but I just fixed the conflicts with STORM-2236. Sadly the fastest way I could do it was to revert the original code and implement similar functionality, which is the latest commit. Could you please take a look at it? I created a few new Subscription implementations that can do the manual partition management. It only needed a small change to the Spout to support a timeout. I will try to look at adding some documentation and also the impact to the trident spout. |
docs/storm-kafka-client.md
Outdated
| ``` | ||
|
|
||
| #### Wildcard Topics | ||
| Wildcard topics will consume from all topics that are on the configured broakers and match the pattern. So in the following example |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... that exist in the brokers list specified and...
docs/storm-kafka-client.md
Outdated
|
|
||
| #### Wildcard Topics | ||
| Wildcard topics will consume from all topics that are on the configured broakers and match the pattern. So in the following example | ||
| "topic", "topic_foo" and "topic_bar" will all match the pattern, but "not_my_topic" would not match. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the pattern "topic.*" ... "won't match"
|
The test failures are unrelated |
hmcl
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@revans2 thanks for your patience waiting for the review, and apologies for taking a little longer to review your patch. I am submitting this batch of comments such that you can address them will I go through the reminder of the code. I will finish my comments today.
docs/storm-kafka-client.md
Outdated
| a spout. | ||
|
|
||
| `bootstrapServers` is the same as the Kafka Consumer Property "bootstrap.servers". | ||
| `topics` The topics the spout will consume can either be a `Collection` of specific topic names (1 or more) or a regular expression `Pattern` and any |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... expression, which specifies that any topics ...
docs/storm-kafka-client.md
Outdated
| `topics` The topics the spout will consume can either be a `Collection` of specific topic names (1 or more) or a regular expression `Pattern` and any | ||
| topics that match that regular expression will be consumed. | ||
|
|
||
| In the case of the Constructors you may also need to specify a key deserializer and a value deserializer. This is to help make the java generics happy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to guarantee type safety through the use of Java generics
docs/storm-kafka-client.md
Outdated
| * `UNCOMMITTED_EARLIEST` (DEFAULT) means that the kafka spout polls records from the last committed offset, if any. If no offset has been committed, it behaves as `EARLIEST`. | ||
| * `UNCOMMITTED_LATEST` means that the kafka spout polls records from the last committed offset, if any. If no offset has been committed, it behaves as `LATEST`. | ||
|
|
||
| `setRecordTranslator` allows you to modify how the spout converts a `ConsumerRecord` into a Tuple and which stream that tuple will go to. By default the "topic", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kafka Consumer Record into a Tuple, and which stream that tuple will be published into.
| "partition", "offset", "key", and "value" will be emitted to the "default" stream. If you want to output entries to different streams based on the topic storm | ||
| provides `ByTopicRecordTranslator`. See below for more examples on how to use these. | ||
|
|
||
| `setProp` can be used to set kafka properties that do not have a convenience method. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Optional" - should we rename this method to setKafkaProp or setKafkaBrokerProp ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are configuring the kafka consumer, so I would assume that it was implied.
docs/storm-kafka-client.md
Outdated
| ### Usage Examples | ||
|
|
||
| #### Create a Simple Insecure Spout | ||
| The following will consume all events published to "topic" and send them to MyBolt as "topic", "partition", "offset", "key", "value". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... MyBolt using the fields "topic" ...
| } | ||
|
|
||
| /** | ||
| * Intended to be overridden for tests. Make the producer from props |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create producer with props.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
backwards compatibility
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No sorry wrong place for that comment.
| private TupleToKafkaMapper<K,V> mapper; | ||
| private KafkaTopicSelector topicSelector; | ||
| private Properties boltSpecfiedProperties = new Properties(); | ||
| private boolean fireAndForget = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add a comment saying to look at the comments in the set method? I missed those comments at first.
| private KafkaTopicSelector topicSelector; | ||
| private Properties boltSpecfiedProperties = new Properties(); | ||
| private boolean fireAndForget = false; | ||
| private boolean async = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add a comment saying to look at the comments in the set method? I missed those comments at first.
| } | ||
|
|
||
| /** | ||
| * If set to true the bold will assume that sending a message to kafka will succeed and will ack |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bolt
| /** | ||
| * If set to true the bold will assume that sending a message to kafka will succeed and will ack | ||
| * the tuple as soon as it has handed the message off to the producer API | ||
| * if false (the default) the message will be acked after it we successfully sent to kafka or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was successfully
| @@ -1,191 +1,5 @@ | |||
| #Storm Kafka Spout with New Kafka Consumer API | |||
| #Storm Apache Kafka integration using the kafka-client jar (This inclused the new Apache Kafka consumer API) | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inclused -> includes
| * <p/> | ||
| * It expects the producer configuration and topic in storm config under | ||
| * <p/> | ||
| * 'kafka.broker.properties' and 'topic' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is kafka.broker.properties still being used somewhere? It might be good to mention that new code should prefer withTopicSelector over using 'topic' in storm conf.
| } | ||
|
|
||
| /** | ||
| * If set to true the bold will assume that sending a message to kafka will succeed and will ack |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bold -> bolt
| private final Map<String, RecordTranslator<K,V>> topicToTranslator = new HashMap<>(); | ||
| private final Map<String, Fields> streamToFields = new HashMap<>(); | ||
|
|
||
| public ByTopicRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This class could do with some javadoc
| private transient Set<KafkaSpoutMessageId> emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed | ||
| private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() | ||
| private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed | ||
| private transient TopologyContext context; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: The rest of the file uses spaces for indentation
| public Deserializer<K> getKeyDeserializer() { | ||
| return keyDeserializer; | ||
| if (keyDesClazz != null) { | ||
| try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tabs
| * If set to true, the consumer manage partition manually, otherwise it will rely on kafka to do partition assignment. | ||
| * @param manualPartitionAssignment True if using manual partition assignment. | ||
| * Sets partition refresh period in milliseconds. This is how often the subscription is refreshed | ||
| * For most subscriptions that go through the KafkaConsumer.subscribe this is ignored. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems vague, and requires the user to go look at the subscription source. Maybe either link the method in Subscription that must be overridden to make this apply, or list the subscriptions that don't ignore this setting
| } | ||
|
|
||
| /** | ||
| * @return a string representing the subscribed topics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this string be in any format? It seems to only be used in getComponentConfiguration, not really sure where that's being read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't know either, but the only place I can see it used is in logging so that is what I did.
| * It works for sending tuples to older Kafka version (0.8.1). | ||
| * @deprecated Please use the KafkaBolt in storm-kafka-client | ||
| */ | ||
| @Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it would be fine to just remove it here immediately and deprecate it in 1.1.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rather do that in a follow on JIRA. Most of the code is the same between the two so getting the rework from here to make there is a lot simpler if I can do a cherry pick
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I filed https://issues.apache.org/jira/browse/STORM-2317 to do this and I assigned it to myself.
| * The maximum number of records a poll will return. | ||
| * Will only work with Kafka 0.10.0 and above. | ||
| */ | ||
| public Builder<K,V> setMaxPoolRecords(int records) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maxPool -> maxPoll
hmcl
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@revans2 This completes my review
| import java.io.Serializable; | ||
|
|
||
| /** | ||
| * as the really verbose name suggests this interface maps a storm tuple to kafka key and message. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would delete "as the really verbose name suggests this " and put "interface defining a mapping from storm tuple to kafka key and message"
| if (fieldIndex < tuple.size()) { | ||
| return tuple.getString(fieldIndex); | ||
| } else { | ||
| LOG.warn("Field Index " + fieldIndex + " Out of bound . Using default topic " + defaultTopicName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bounds. Returning default ...
| } | ||
|
|
||
| @Override | ||
| public String getTopic(Tuple tuple) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic could be incorporated in the FieldIndexTopicSelector. It should be possible to delete this class, rename FieldIndexTopicSelector to FieldTopicSelector, and provide two constructors. One with fieldName and another with fieldIndex. And then use Tuple#fieldIndex(String field) method to extract one from the other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree but both for backwards compatibility and to limit the scope of this JIRA I would rather see it done in a follow on JIRA
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I filed https://issues.apache.org/jira/browse/STORM-2318 for this
| * Uses field with a given index to select the topic name from a tuple . | ||
| */ | ||
| public class FieldIndexTopicSelector implements KafkaTopicSelector { | ||
| private static final long serialVersionUID = -3830575380208166367L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this class and other classes (e.g. ByTopicRecordTranslator) have serialVersionUID but others (e.g. DefaultTopicSelector, FieldNameTopicSelector) don't?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I missed them.
| Fields fromTrans = translator.getFieldsFor(stream); | ||
| Fields cached = streamToFields.get(stream); | ||
| if (cached != null && !fromTrans.equals(cached)) { | ||
| throw new IllegalArgumentException("Stream " + stream + " currently has Fields of " + cached + " which is not the same as those being added in " + fromTrans); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IllegalStateException ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case it is the argument being passed in that is bad and we are rejecting it. They could come back and switch it to a new stream which would be fine.
| private TridentTupleToKafkaMapper mapper; | ||
| private KafkaTopicSelector topicSelector; | ||
|
|
||
| public TridentKafkaState withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be preferable the class to be immutable ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is code that was "moved" like with the KafkaBolt for storm-kafka to storm-kafka-client. If we really want to make it immutable we can, but I think that is beyond the scope of this JIRA
| } | ||
|
|
||
| public void prepare(Properties options) { | ||
| if (mapper == null) throw new NullPointerException("mapper can not be null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Objects.requireNonNull("mapper cannot be null");
..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Objects does not exist in java 6 and I would prefer to keep the code compatible as mush as possible to avoid extra rework when pulling these changes back. If you insist I will do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought 1.x was targeting at least Java 7?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srdo you are right.
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
I'll update it.
| import java.io.Serializable; | ||
|
|
||
| public interface KafkaTopicSelector extends Serializable { | ||
| String getTopic(Tuple tuple); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since in some scenarios it's OK that this value returns null, wouldn't it be much clearer to return Optional<String> instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File another JIRA and we can look into it. I think it is beyond the scope of this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO we need to try avoiding JDK 8 feature for public API unless we decided to release minor version only for 2.0.0. But anyway I also think this is beyond of the scope.
|
|
||
| @Override | ||
| public K getKeyFromTuple(TridentTuple tuple) { | ||
| return (K) tuple.getValueByField(keyFieldName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ClassCastException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible but that is the point of the generics. To try and reduce the likelihood of it happening.
| } | ||
|
|
||
| @Override | ||
| public V getMessageFromTuple(TridentTuple tuple) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ClassCastException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible but that is the point of the generics. To try and reduce the likelihood of it happening.
|
@hmcl I think I addressed all of your review comments. I also re-based to deal with the merge conflict. If everything looks good I will squash some of the commits. There are a lot. |
|
The travis test failures are unrelated. |
|
There are still a few files with tabs mixed in to spaces, so autoformatting all the files might be good. I also still don't really understand why the KafkaSpoutConfig builder needs to support setting a deserializer with a different generic type to the one the builder was instantiated with. +1 though. |
|
+1 |
|
+1 My only suggestion at this point is that we file an umbrella JIRA covering some of the follow up comments that haven't been addressed, such as minor refactoring, improved (java) docs, immutability, etc. Basically to just capture the most meaningful comments that makes sense handle later. Thanks @revans2 for all your patience addressing the many review comments |
STORM-2225: change spout config to be simpler. STORM-2228: removed ability to request a single topic go to multiple streams STORM-2236: Reimplemented manual partition management on top of STORM-2225
|
Everyone, Sorry this took so long. I kept getting pulled off on critical issues. I have rebased/squashed everything (except the code revert) and also fixed the issue with tabs vs spaces. Since it was just white space issues I don't think I need more +1s but if someone wants to take a look again please feel free to. I will do the same for the 1.x version, and then file the umbrella jira that @hmcl requested. |
|
@hmcl github like to truncate long messages. If you really want me to squash the code change with the revert and only reference a single JIRA in the commit messages I can. But I thought it would be better if I am addressing 4 separate (but very closely related) JIRA I should keep them all together. |
|
@revans2 this is perfect. These 4 commits make it much easier to track down. I confused the way GitHub truncates the messages - my bad. +1 |
|
Thanks for the great work and patience. +1 |
STORM-2228: removed ability to request a single topic go to multiple streams