diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java index 94bf1348228..88f3cd1cb4b 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java +++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java @@ -23,6 +23,7 @@ import backtype.storm.metric.api.ReducedMetric; import backtype.storm.task.TopologyContext; import com.google.common.collect.ImmutableMap; +import kafka.api.OffsetRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.Message; @@ -104,6 +105,14 @@ private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition } if (_config.forceFromStart && !_topologyInstanceId.equals(lastInstanceId)) { offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config.startOffsetTime); + if(_config.useStartOffsetTimeIfOffsetOutOfRange){ + long nextOffset = (Long) lastMeta.get("nextOffset"); + long earliestAvailableOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, OffsetRequest.EarliestTime()); + long latestAvailableOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, OffsetRequest.LatestTime()); + if(earliestAvailableOffset <= nextOffset && nextOffset <= latestAvailableOffset) { // in the offset range. + offset = nextOffset; + } + } } else { offset = (Long) lastMeta.get("nextOffset"); }