Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion external/storm-kafka-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,14 @@
<!--test dependencies -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
<version>${mockito.version}</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;

public class KafkaSpout<K, V> extends BaseRichSpout {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
Expand All @@ -62,6 +64,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {

// Kafka
private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
private final KafkaConsumerFactory kafkaConsumerFactory;
private transient KafkaConsumer<K, V> kafkaConsumer;
private transient boolean consumerAutoCommitMode;

Expand All @@ -84,8 +87,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {


public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault());
}

//This constructor is here for testing
KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration
this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
this.kafkaConsumerFactory = kafkaConsumerFactory;
}

@Override
Expand Down Expand Up @@ -145,6 +154,10 @@ private void initialize(Collection<TopicPartition> partitions) {
}

retryService.retainAll(partitions);

//Emitted messages for partitions that are no longer assigned to this spout can't be acked, and they shouldn't be retried. Remove them from emitted.
Set<TopicPartition> partitionsSet = new HashSet(partitions);
emitted.removeIf((msgId) -> !partitionsSet.contains(msgId.getTopicPartition()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good. I think this same logic may be needed in onPartitionsRevoked as well. Also, I believe the message may need to be removed from the retryService as well. Please correct me if I am wrong!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The messages should be getting removed from retryService in line 156. It's my impression that onPartitionsAssigned will be getting called immediately after onPartitionsRevoked, before the current call to poll returns (see https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html).


for (TopicPartition tp : partitions) {
final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
Expand Down Expand Up @@ -289,16 +302,19 @@ private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
} else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
} else if (!retryService.isScheduled(msgId) || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
final List<Object> tuple = tuplesBuilder.buildTuple(record);
kafkaSpoutStreams.emit(collector, tuple, msgId);
emitted.add(msgId);
numUncommittedOffsets++;
if (retryService.isReady(msgId)) { // has failed. Is it ready for retry ?
retryService.remove(msgId); // re-emitted hence remove from failed
} else {
boolean isScheduled = retryService.isScheduled(msgId);
if (!isScheduled || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
final List<Object> tuple = tuplesBuilder.buildTuple(record);
kafkaSpoutStreams.emit(collector, tuple, msgId);
emitted.add(msgId);
numUncommittedOffsets++;
if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule.
retryService.remove(msgId);
}
LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
return true;
}
LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
return true;
}
return false;
}
Expand Down Expand Up @@ -333,6 +349,11 @@ private void commitOffsetsForAckedTuples() {
@Override
public void ack(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
if(!emitted.contains(msgId)) {
LOG.debug("Received ack for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
return;
}

if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically
acked.get(msgId.getTopicPartition()).add(msgId);
}
Expand All @@ -344,8 +365,12 @@ public void ack(Object messageId) {
@Override
public void fail(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
emitted.remove(msgId);
if(!emitted.contains(msgId)) {
LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
return;
}
if (msgId.numFails() < maxRetries) {
emitted.remove(msgId);
msgId.incrementNumFails();
retryService.schedule(msgId);
} else { // limit to max number of retries
Expand All @@ -362,8 +387,7 @@ public void activate() {
}

private void subscribeKafkaConsumer() {
kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);

if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
final List<String> topics = ((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ private Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafk

public static class Builder<K,V> {
private final Map<String, Object> kafkaProps;
private Deserializer<K> keyDeserializer;
private Deserializer<V> valueDeserializer;
private SerializableDeserializer<K> keyDeserializer;
private SerializableDeserializer<V> valueDeserializer;
private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
private int maxRetries = DEFAULT_MAX_RETRIES;
Expand Down Expand Up @@ -164,15 +164,15 @@ public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStrea
/**
* Specifying this key deserializer overrides the property key.deserializer
*/
public Builder<K,V> setKeyDeserializer(Deserializer<K> keyDeserializer) {
public Builder<K,V> setKeyDeserializer(SerializableDeserializer<K> keyDeserializer) {
this.keyDeserializer = keyDeserializer;
return this;
}

/**
* Specifying this value deserializer overrides the property value.deserializer
*/
public Builder<K,V> setValueDeserializer(Deserializer<V> valueDeserializer) {
public Builder<K,V> setValueDeserializer(SerializableDeserializer<V> valueDeserializer) {
this.valueDeserializer = valueDeserializer;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2016 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.kafka.spout;

import java.io.Serializable;
import org.apache.kafka.common.serialization.Deserializer;

/**
* @param <T> The type this deserializer deserializes to.
*/
public interface SerializableDeserializer<T> extends Deserializer<T>, Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this wrapper marking interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it was nice to have, since setKey/ValueDeserializer in the builder implicitly requires the deserializer to be serializable. For example, if you try to set the standard Kafka StringDeserializer via those methods, you'll get a NotSerializableException when the topology is submitted to Storm, since they'll be set as fields on the final KafkaSpoutConfig field in KafkaSpout.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think I am going to need to fix that on my patch.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2016 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.kafka.spout.internal;

import java.io.Serializable;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;

/**
* This is here to enable testing
*/
public interface KafkaConsumerFactory<K, V> extends Serializable {
public KafkaConsumer<K,V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2016 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.kafka.spout.internal;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;

public class KafkaConsumerFactoryDefault<K, V> implements KafkaConsumerFactory<K, V> {

@Override
public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
return new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
}

}
Loading