Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;

import org.apache.kafka.common.errors.InterruptException;

public class KafkaSpout<K, V> extends BaseRichSpout {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator();
Expand Down Expand Up @@ -212,22 +214,32 @@ private void setAcked(TopicPartition tp, long fetchOffset) {

@Override
public void nextTuple() {
if (initialized) {
if (commit()) {
commitOffsetsForAckedTuples();
}
try{
if (initialized) {
if (commit()) {
commitOffsetsForAckedTuples();
}

if (poll()) {
setWaitingToEmit(pollKafkaBroker());
}
if (poll()) {
setWaitingToEmit(pollKafkaBroker());
}

if (waitingToEmit()) {
emit();
if (waitingToEmit()) {
emit();
}
} else {
LOG.debug("Spout not initialized. Not sending tuples until initialization completes");
}
} else {
LOG.debug("Spout not initialized. Not sending tuples until initialization completes");
} catch (InterruptException e) {
throwKafkaConsumerInterruptedException();
}
}

private void throwKafkaConsumerInterruptedException() {
//Kafka throws their own type of exception when interrupted.
//Throw a new Java InterruptedException to ensure Storm can recognize the exception as a reaction to an interrupt.
throw new RuntimeException(new InterruptedException("Kafka consumer was interrupted"));
}

private boolean commit() {
return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode
Expand Down Expand Up @@ -383,7 +395,11 @@ public void fail(Object messageId) {

@Override
public void activate() {
subscribeKafkaConsumer();
try {
subscribeKafkaConsumer();
} catch (InterruptException e) {
throwKafkaConsumerInterruptedException();
}
}

private void subscribeKafkaConsumer() {
Expand All @@ -405,12 +421,20 @@ private void subscribeKafkaConsumer() {

@Override
public void deactivate() {
shutdown();
try {
shutdown();
} catch (InterruptException e) {
throwKafkaConsumerInterruptedException();
}
}

@Override
public void close() {
shutdown();
try {
shutdown();
} catch (InterruptException e) {
throwKafkaConsumerInterruptedException();
}
Copy link
Contributor

@hmcl hmcl Dec 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srdo Can you please create a constant for all of these fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hmcl Better?

}

private void shutdown() {
Expand Down