diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index ab7227ddea7..d405c4da874 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -55,6 +55,8 @@ import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.kafka.common.errors.InterruptException; + public class KafkaSpout extends BaseRichSpout { private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); private static final Comparator OFFSET_COMPARATOR = new OffsetComparator(); @@ -212,22 +214,32 @@ private void setAcked(TopicPartition tp, long fetchOffset) { @Override public void nextTuple() { - if (initialized) { - if (commit()) { - commitOffsetsForAckedTuples(); - } + try{ + if (initialized) { + if (commit()) { + commitOffsetsForAckedTuples(); + } - if (poll()) { - setWaitingToEmit(pollKafkaBroker()); - } + if (poll()) { + setWaitingToEmit(pollKafkaBroker()); + } - if (waitingToEmit()) { - emit(); + if (waitingToEmit()) { + emit(); + } + } else { + LOG.debug("Spout not initialized. Not sending tuples until initialization completes"); } - } else { - LOG.debug("Spout not initialized. Not sending tuples until initialization completes"); + } catch (InterruptException e) { + throwKafkaConsumerInterruptedException(); } } + + private void throwKafkaConsumerInterruptedException() { + //Kafka throws their own type of exception when interrupted. + //Throw a new Java InterruptedException to ensure Storm can recognize the exception as a reaction to an interrupt. + throw new RuntimeException(new InterruptedException("Kafka consumer was interrupted")); + } private boolean commit() { return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode @@ -383,7 +395,11 @@ public void fail(Object messageId) { @Override public void activate() { - subscribeKafkaConsumer(); + try { + subscribeKafkaConsumer(); + } catch (InterruptException e) { + throwKafkaConsumerInterruptedException(); + } } private void subscribeKafkaConsumer() { @@ -405,12 +421,20 @@ private void subscribeKafkaConsumer() { @Override public void deactivate() { - shutdown(); + try { + shutdown(); + } catch (InterruptException e) { + throwKafkaConsumerInterruptedException(); + } } @Override public void close() { - shutdown(); + try { + shutdown(); + } catch (InterruptException e) { + throwKafkaConsumerInterruptedException(); + } } private void shutdown() {