diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 4389acb50e5..aa5d6e04719 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -329,7 +329,15 @@ private void commitOffsetsForAckedTuples() { public void ack(Object messageId) { final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically - acked.get(msgId.getTopicPartition()).add(msgId); + final MessageId msgId = (MessageId) messageId; + TopicPartition partition = msgId.getTopicPartition(); + OffsetEntry entry = acked.get(partition); + if (entry == null) { + LOG.warn("Tried to add message with id {} to non-existing partition {}.", msgId, partition); + return; + } + entry.add(msgId); + LOG.debug("Added acked message [{}] to list of messages to be committed to Kafka", msgId); } emitted.remove(msgId); }