diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml index 0878fdf7baa..92b666d1c66 100644 --- a/external/storm-kafka-client/pom.xml +++ b/external/storm-kafka-client/pom.xml @@ -76,7 +76,13 @@ org.hamcrest - hamcrest-all + hamcrest-core + 1.3 + test + + + org.hamcrest + hamcrest-library 1.3 test @@ -94,7 +100,6 @@ org.slf4j log4j-over-slf4j - ${log4j-over-slf4j.version} test diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 9ad2be294bf..bbad9e84f49 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -25,16 +25,13 @@ import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.NavigableSet; import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -48,6 +45,7 @@ import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.kafka.spout.internal.OffsetManager; import org.apache.storm.kafka.spout.internal.Timer; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; @@ -58,19 +56,19 @@ public class KafkaSpout extends BaseRichSpout { private static final long serialVersionUID = 4151921085047987154L; + //Initial delay for the commit and subscription refresh timers + public static final long TIMER_DELAY_MS = 500; private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); - private static final Comparator OFFSET_COMPARATOR = new OffsetComparator(); // Storm protected SpoutOutputCollector collector; // Kafka private final KafkaSpoutConfig kafkaSpoutConfig; - private final KafkaConsumerFactory kafkaConsumerFactory; + private KafkaConsumerFactory kafkaConsumerFactory; private transient KafkaConsumer kafkaConsumer; private transient boolean consumerAutoCommitMode; - // Bookkeeping private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Strategy to determine the fetch offset of the first realized by the spout upon activation private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure @@ -78,7 +76,7 @@ public class KafkaSpout extends BaseRichSpout { private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process. // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned() - transient Map acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate. Not used if it's AutoCommitMode + private transient Map acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate private transient Set emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. Not used if it's AutoCommitMode private transient Iterator> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed. Not used if it's AutoCommitMode @@ -87,13 +85,13 @@ public class KafkaSpout extends BaseRichSpout { public KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig) { - this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault()); + this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault()); } //This constructor is here for testing KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig, KafkaConsumerFactory kafkaConsumerFactory) { - this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration this.kafkaConsumerFactory = kafkaConsumerFactory; + this.kafkaSpoutConfig = kafkaSpoutConfig; } @Override @@ -114,9 +112,9 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect retryService = kafkaSpoutConfig.getRetryService(); if (!consumerAutoCommitMode) { // If it is auto commit, no need to commit offsets manually - commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS); + commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS); } - refreshSubscriptionTimer = new Timer(500, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS); + refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS); acked = new HashMap<>(); emitted = new HashSet<>(); @@ -204,7 +202,7 @@ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { private void setAcked(TopicPartition tp, long fetchOffset) { // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off if (!consumerAutoCommitMode && !acked.containsKey(tp)) { - acked.put(tp, new OffsetEntry(tp, fetchOffset)); + acked.put(tp, new OffsetManager(tp, fetchOffset)); } } @@ -296,7 +294,7 @@ private void doSeekRetriableTopicPartitions() { if (offsetAndMeta != null) { kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle } else { - kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1); // Seek to last committed offset + kafkaConsumer.seek(rtp, acked.get(rtp).getCommittedOffset() + 1); // Seek to last committed offset } } } @@ -353,7 +351,7 @@ private boolean emitTupleIfNotEmitted(ConsumerRecord record) { private void commitOffsetsForAckedTuples() { // Find offsets that are ready to be committed for every topic partition final Map nextCommitOffsets = new HashMap<>(); - for (Map.Entry tpOffset : acked.entrySet()) { + for (Map.Entry tpOffset : acked.entrySet()) { final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(); if (nextCommitOffset != null) { nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset); @@ -366,9 +364,14 @@ private void commitOffsetsForAckedTuples() { LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets); // Instead of iterating again, it would be possible to commit and update the state for each TopicPartition // in the prior loop, but the multiple network calls should be more expensive than iterating twice over a small loop - for (Map.Entry tpOffset : acked.entrySet()) { - final OffsetEntry offsetEntry = tpOffset.getValue(); - offsetEntry.commit(nextCommitOffsets.get(tpOffset.getKey())); + for (Map.Entry tpOffset : nextCommitOffsets.entrySet()) { + //Update the OffsetManager for each committed partition, and update numUncommittedOffsets + final TopicPartition tp = tpOffset.getKey(); + final OffsetManager offsetManager = acked.get(tp); + long numCommittedOffsets = offsetManager.commit(tpOffset.getValue()); + numUncommittedOffsets -= numCommittedOffsets; + LOG.debug("[{}] uncommitted offsets across all topic partitions", + numUncommittedOffsets); } } else { LOG.trace("No offsets to commit. {}", this); @@ -489,127 +492,7 @@ public Map getComponentConfiguration () { private String getTopicsString() { return kafkaSpoutConfig.getSubscription().getTopicsString(); } +} - // ======= Offsets Commit Management ========== - - private static class OffsetComparator implements Comparator { - public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) { - return m1.offset() < m2.offset() ? -1 : m1.offset() == m2.offset() ? 0 : 1; - } - } - - /** - * This class is not thread safe - */ - class OffsetEntry { - private final TopicPartition tp; - private final long initialFetchOffset; /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset. - * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */ - private long committedOffset; // last offset committed to Kafka. Initially it is set to fetchOffset - 1 - private final NavigableSet ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR); // acked messages sorted by ascending order of offset - - public OffsetEntry(TopicPartition tp, long initialFetchOffset) { - this.tp = tp; - this.initialFetchOffset = initialFetchOffset; - this.committedOffset = initialFetchOffset - 1; - LOG.debug("Instantiated {}", this); - } - - public void add(KafkaSpoutMessageId msgId) { // O(Log N) - ackedMsgs.add(msgId); - } - - /** - * An offset is only committed when all records with lower offset have - * been acked. This guarantees that all offsets smaller than the - * committedOffset have been delivered. - * @return the next OffsetAndMetadata to commit, or null if no offset is ready to commit. - */ - public OffsetAndMetadata findNextCommitOffset() { - boolean found = false; - long currOffset; - long nextCommitOffset = committedOffset; - KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata - - for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap - if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) { // found the next offset to commit - found = true; - nextCommitMsg = currAckedMsg; - nextCommitOffset = currOffset; - } else if (currAckedMsg.offset() > nextCommitOffset + 1) { // offset found is not continuous to the offsets listed to go in the next commit, so stop search - LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset); - break; - } else { - //Received a redundant ack. Ignore and continue processing. - LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]", - tp, currOffset, committedOffset); - } - } - - OffsetAndMetadata nextCommitOffsetAndMetadata = null; - if (found) { - nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread())); - LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed",tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset()); - } else { - LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp); - } - LOG.trace("{}", this); - return nextCommitOffsetAndMetadata; - } - - /** - * Marks an offset has committed. This method has side effects - it sets the internal state in such a way that future - * calls to {@link #findNextCommitOffset()} will return offsets greater than the offset specified, if any. - * - * @param committedOffset offset to be marked as committed - */ - public void commit(OffsetAndMetadata committedOffset) { - long numCommittedOffsets = 0; - if (committedOffset != null) { - final long oldCommittedOffset = this.committedOffset; - numCommittedOffsets = committedOffset.offset() - this.committedOffset; - this.committedOffset = committedOffset.offset(); - for (Iterator iterator = ackedMsgs.iterator(); iterator.hasNext(); ) { - if (iterator.next().offset() <= committedOffset.offset()) { - iterator.remove(); - } else { - break; - } - } - numUncommittedOffsets-= numCommittedOffsets; - LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}]. [{}] uncommitted offsets across all topic partitions", - oldCommittedOffset + 1, this.committedOffset, numCommittedOffsets, tp, numUncommittedOffsets); - } else { - LOG.debug("Committed [{}] offsets for topic-partition [{}]. [{}] uncommitted offsets across all topic partitions", - numCommittedOffsets, tp, numUncommittedOffsets); - } - LOG.trace("{}", this); - } - - long getCommittedOffset() { - return committedOffset; - } - - public boolean isEmpty() { - return ackedMsgs.isEmpty(); - } - public boolean contains(ConsumerRecord record) { - return contains(new KafkaSpoutMessageId(record)); - } - - public boolean contains(KafkaSpoutMessageId msgId) { - return ackedMsgs.contains(msgId); - } - @Override - public String toString() { - return "OffsetEntry{" + - "topic-partition=" + tp + - ", fetchOffset=" + initialFetchOffset + - ", committedOffset=" + committedOffset + - ", ackedMsgs=" + ackedMsgs + - '}'; - } - } -} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java new file mode 100755 index 00000000000..4ce04718f27 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -0,0 +1,157 @@ +/* + * Copyright 2016 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout.internal; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.NavigableSet; +import java.util.TreeSet; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutMessageId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages acked and committed offsets for a TopicPartition. This class is not thread safe + */ +public class OffsetManager { + + private static final Comparator OFFSET_COMPARATOR = new OffsetComparator(); + private static final Logger LOG = LoggerFactory.getLogger(OffsetManager.class); + private final TopicPartition tp; + /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset. + * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */ + private final long initialFetchOffset; + // Last offset committed to Kafka. Initially it is set to fetchOffset - 1 + private long committedOffset; + // Acked messages sorted by ascending order of offset + private final NavigableSet ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR); + + public OffsetManager(TopicPartition tp, long initialFetchOffset) { + this.tp = tp; + this.initialFetchOffset = initialFetchOffset; + this.committedOffset = initialFetchOffset - 1; + LOG.debug("Instantiated {}", this); + } + + public void add(KafkaSpoutMessageId msgId) { // O(Log N) + ackedMsgs.add(msgId); + } + + /** + * An offset is only committed when all records with lower offset have been + * acked. This guarantees that all offsets smaller than the committedOffset + * have been delivered. + * + * @return the next OffsetAndMetadata to commit, or null if no offset is + * ready to commit. + */ + public OffsetAndMetadata findNextCommitOffset() { + boolean found = false; + long currOffset; + long nextCommitOffset = committedOffset; + KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata + + for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap + if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) { // found the next offset to commit + found = true; + nextCommitMsg = currAckedMsg; + nextCommitOffset = currOffset; + } else if (currAckedMsg.offset() > nextCommitOffset + 1) { // offset found is not continuous to the offsets listed to go in the next commit, so stop search + LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset); + break; + } else { + //Received a redundant ack. Ignore and continue processing. + LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]", + tp, currOffset, committedOffset); + } + } + + OffsetAndMetadata nextCommitOffsetAndMetadata = null; + if (found) { + nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread())); + LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset()); + } else { + LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp); + } + LOG.trace("{}", this); + return nextCommitOffsetAndMetadata; + } + + /** + * Marks an offset has committed. This method has side effects - it sets the + * internal state in such a way that future calls to + * {@link #findNextCommitOffset()} will return offsets greater than the + * offset specified, if any. + * + * @param committedOffset offset to be marked as committed + * @return Number of offsets committed in this commit + */ + public long commit(OffsetAndMetadata committedOffset) { + long preCommitCommittedOffsets = this.committedOffset; + long numCommittedOffsets = committedOffset.offset() - this.committedOffset; + this.committedOffset = committedOffset.offset(); + for (Iterator iterator = ackedMsgs.iterator(); iterator.hasNext();) { + if (iterator.next().offset() <= committedOffset.offset()) { + iterator.remove(); + } else { + break; + } + } + LOG.trace("{}", this); + + LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}].", + preCommitCommittedOffsets + 1, this.committedOffset, numCommittedOffsets, tp); + + return numCommittedOffsets; + } + + public long getCommittedOffset() { + return committedOffset; + } + + public boolean isEmpty() { + return ackedMsgs.isEmpty(); + } + + public boolean contains(ConsumerRecord record) { + return contains(new KafkaSpoutMessageId(record)); + } + + public boolean contains(KafkaSpoutMessageId msgId) { + return ackedMsgs.contains(msgId); + } + + @Override + public String toString() { + return "OffsetManager{" + + "topic-partition=" + tp + + ", fetchOffset=" + initialFetchOffset + + ", committedOffset=" + committedOffset + + ", ackedMsgs=" + ackedMsgs + + '}'; + } + + private static class OffsetComparator implements Comparator { + + @Override + public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) { + return m1.offset() < m2.offset() ? -1 : m1.offset() == m2.offset() ? 0 : 1; + } + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java index d51104df82f..2a2e1cb69c1 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java @@ -18,6 +18,7 @@ package org.apache.storm.kafka.spout.internal; import java.util.concurrent.TimeUnit; +import org.apache.storm.utils.Time; public class Timer { private final long delay; @@ -41,7 +42,7 @@ public Timer(long delay, long period, TimeUnit timeUnit) { this.timeUnit = timeUnit; periodNanos = timeUnit.toNanos(period); - start = System.nanoTime() + timeUnit.toNanos(delay); + start = Time.nanoTime() + timeUnit.toNanos(delay); } public long period() { @@ -65,9 +66,9 @@ public TimeUnit getTimeUnit() { * otherwise. */ public boolean isExpiredResetOnTrue() { - final boolean expired = System.nanoTime() - start > periodNanos; + final boolean expired = Time.nanoTime() - start >= periodNanos; if (expired) { - start = System.nanoTime(); + start = Time.nanoTime(); } return expired; } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java index ea0b6e73f37..abc58f0e8e5 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java @@ -17,7 +17,7 @@ */ package org.apache.storm.kafka.spout; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; import java.util.Arrays; import java.util.HashSet; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java index f4275e49d10..681953d1f77 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java @@ -17,7 +17,7 @@ */ package org.apache.storm.kafka.spout; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; import java.util.Arrays; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java index 08220dd207e..57e01205d69 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java @@ -17,7 +17,9 @@ */ package org.apache.storm.kafka.spout; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.util.HashMap; diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java index 9969d84eada..6a0a63ed97a 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -21,10 +21,10 @@ import static org.junit.Assert.assertThat; import static org.mockito.Matchers.anyCollection; import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -44,6 +44,8 @@ import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -56,18 +58,16 @@ public class KafkaSpoutRebalanceTest { @Captor private ArgumentCaptor> commitCapture; - private TopologyContext contextMock; - private SpoutOutputCollector collectorMock; - private Map conf; + private final long offsetCommitPeriodMs = 2_000; + private final TopologyContext contextMock = mock(TopologyContext.class); + private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); + private final Map conf = new HashMap<>(); private KafkaConsumer consumerMock; private KafkaConsumerFactory consumerFactoryMock; @Before public void setUp() { MockitoAnnotations.initMocks(this); - contextMock = mock(TopologyContext.class); - collectorMock = mock(SpoutOutputCollector.class); - conf = new HashMap<>(); consumerMock = mock(KafkaConsumer.class); consumerFactoryMock = new KafkaConsumerFactory(){ @Override @@ -99,9 +99,9 @@ private List emitOneMessagePerPartitionThenRevokeOnePartiti Map>> secondPartitionRecords = new HashMap<>(); secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord<>(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value"))); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(firstPartitionRecords)) - .thenReturn(new ConsumerRecords(secondPartitionRecords)) - .thenReturn(new ConsumerRecords(Collections.emptyMap())); + .thenReturn(new ConsumerRecords(firstPartitionRecords)) + .thenReturn(new ConsumerRecords(secondPartitionRecords)) + .thenReturn(new ConsumerRecords(Collections.emptyMap())); //Emit the messages spout.nextTuple(); @@ -115,7 +115,7 @@ private List emitOneMessagePerPartitionThenRevokeOnePartiti //Now rebalance consumerRebalanceListener.onPartitionsRevoked(assignedPartitions); consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition)); - + List emittedMessageIds = new ArrayList<>(); emittedMessageIds.add(messageIdForRevokedPartition.getValue()); emittedMessageIds.add(messageIdForAssignedPartition.getValue()); @@ -125,31 +125,32 @@ private List emitOneMessagePerPartitionThenRevokeOnePartiti @Test public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception { //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them - KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, 10), consumerFactoryMock); - String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; - TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1); - TopicPartition assignedPartition = new TopicPartition(topic, 2); - - //Emit a message on each partition and revoke the first partition - List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition); - - //Ack both emitted tuples - spout.ack(emittedMessageIds.get(0)); - spout.ack(emittedMessageIds.get(1)); - - //Ensure the commit timer has expired - Thread.sleep(510); - - //Make the spout commit any acked tuples - spout.nextTuple(); - //Verify that it only committed the message on the assigned partition - verify(consumerMock).commitSync(commitCapture.capture()); - - Map commitCaptureMap = commitCapture.getValue(); - assertThat(commitCaptureMap, hasKey(assignedPartition)); - assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked))); + try (SimulatedTime simulatedTime = new SimulatedTime()) { + KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, this.offsetCommitPeriodMs), consumerFactoryMock); + String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; + TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1); + TopicPartition assignedPartition = new TopicPartition(topic, 2); + + //Emit a message on each partition and revoke the first partition + List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition); + + //Ack both emitted tuples + spout.ack(emittedMessageIds.get(0)); + spout.ack(emittedMessageIds.get(1)); + + //Ensure the commit timer has expired + Time.advanceTime(offsetCommitPeriodMs + KafkaSpout.TIMER_DELAY_MS); + //Make the spout commit any acked tuples + spout.nextTuple(); + //Verify that it only committed the message on the assigned partition + verify(consumerMock, times(1)).commitSync(commitCapture.capture()); + + Map commitCaptureMap = commitCapture.getValue(); + assertThat(commitCaptureMap, hasKey(assignedPartition)); + assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked))); + } } - + @Test public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception { //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass @@ -158,14 +159,14 @@ public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1); TopicPartition assignedPartition = new TopicPartition(topic, 2); - + //Emit a message on each partition and revoke the first partition List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition); - + //Fail both emitted tuples spout.fail(emittedMessageIds.get(0)); spout.fail(emittedMessageIds.get(1)); - + //Check that only the tuple on the currently assigned partition is retried verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0)); verify(retryServiceMock).schedule(emittedMessageIds.get(1)); diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index f457b5968ac..3ea2d69aeac 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -17,6 +17,8 @@ */ package org.apache.storm.kafka.spout; +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig; + import info.batey.kafka.unit.KafkaUnitRule; import kafka.producer.KeyedMessage; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -28,29 +30,62 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; -import static org.junit.Assert.*; - import java.util.Map; -import static org.mockito.Mockito.*; -import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.*; - -public class SingleTopicKafkaSpoutTest { - private class SpoutContext { - public KafkaSpout spout; - public SpoutOutputCollector collector; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.HashMap; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; +import org.mockito.Captor; +import org.mockito.MockitoAnnotations; - public SpoutContext(KafkaSpout spout, - SpoutOutputCollector collector) { - this.spout = spout; - this.collector = collector; - } - } +public class SingleTopicKafkaSpoutTest { @Rule public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(); - void populateTopicData(String topicName, int msgCount) { + @Captor + private ArgumentCaptor> commitCapture; + + private final TopologyContext topologyContext = mock(TopologyContext.class); + private final Map conf = new HashMap<>(); + private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class); + private final long commitOffsetPeriodMs = 2_000; + private KafkaConsumer consumerSpy; + private KafkaConsumerFactory consumerFactory; + private KafkaSpout spout; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + KafkaSpoutConfig spoutConfig = getKafkaSpoutConfig(kafkaUnitRule.getKafkaPort(), commitOffsetPeriodMs); + this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig)); + this.consumerFactory = new KafkaConsumerFactory() { + @Override + public KafkaConsumer createConsumer(KafkaSpoutConfig kafkaSpoutConfig) { + return consumerSpy; + } + + }; + this.spout = new KafkaSpout<>(spoutConfig, consumerFactory); + } + + private void populateTopicData(String topicName, int msgCount) { kafkaUnitRule.getKafkaUnit().createTopic(topicName); for (int i = 0; i < msgCount; i++){ @@ -62,184 +97,180 @@ void populateTopicData(String topicName, int msgCount) { }; } - SpoutContext initializeSpout(int msgCount) { + private void initializeSpout(int msgCount) { populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount); - int kafkaPort = kafkaUnitRule.getKafkaPort(); - - TopologyContext topology = mock(TopologyContext.class); - SpoutOutputCollector collector = mock(SpoutOutputCollector.class); - Map conf = mock(Map.class); - - KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfig(kafkaPort)); - spout.open(conf, topology, collector); + spout.open(conf, topologyContext, collector); spout.activate(); - return new SpoutContext(spout, collector); } + /* - * Asserts that the next possible offset to commit or the committed offset is the provided offset. - * An offset that is ready to be committed is not guarenteed to be already committed. + * Asserts that commitSync has been called once, + * that there are only commits on one topic, + * and that the committed offset covers messageCount messages */ - private void assertOffsetCommitted(int offset, KafkaSpout.OffsetEntry entry) { - - boolean currentOffsetMatch = entry.getCommittedOffset() == offset; - OffsetAndMetadata nextOffset = entry.findNextCommitOffset(); - boolean nextOffsetMatch = nextOffset != null && nextOffset.offset() == offset; - assertTrue("Next offset: " + - entry.findNextCommitOffset() + - " OR current offset: " + - entry.getCommittedOffset() + - " must equal desired offset: " + - offset, - currentOffsetMatch | nextOffsetMatch); + private void verifyAllMessagesCommitted(long messageCount) { + verify(consumerSpy, times(1)).commitSync(commitCapture.capture()); + Map commits = commitCapture.getValue(); + assertThat("Expected commits for only one topic partition", commits.entrySet().size(), is(1)); + OffsetAndMetadata offset = commits.entrySet().iterator().next().getValue(); + assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount - 1)); } @Test public void shouldContinueWithSlowDoubleAcks() throws Exception { - int messageCount = 20; - SpoutContext context = initializeSpout(messageCount); - - //play 1st tuple - ArgumentCaptor messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class); - context.spout.nextTuple(); - verify(context.collector).emit(anyString(), anyList(), messageIdToDoubleAck.capture()); - context.spout.ack(messageIdToDoubleAck.getValue()); - - for (int i = 0; i < messageCount/2; i++) { - context.spout.nextTuple(); - }; - - context.spout.ack(messageIdToDoubleAck.getValue()); - - for (int i = 0; i < messageCount; i++) { - context.spout.nextTuple(); - }; + try (SimulatedTime simulatedTime = new SimulatedTime()) { + int messageCount = 20; + initializeSpout(messageCount); + + //play 1st tuple + ArgumentCaptor messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class); + spout.nextTuple(); + verify(collector).emit(anyString(), anyList(), messageIdToDoubleAck.capture()); + spout.ack(messageIdToDoubleAck.getValue()); + + //Emit some more messages + for(int i = 0; i < messageCount / 2; i++) { + spout.nextTuple(); + } + + spout.ack(messageIdToDoubleAck.getValue()); + + //Emit any remaining messages + for(int i = 0; i < messageCount; i++) { + spout.nextTuple(); + } + + //Verify that all messages are emitted, ack all the messages + ArgumentCaptor messageIds = ArgumentCaptor.forClass(Object.class); + verify(collector, times(messageCount)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), + anyList(), + messageIds.capture()); + for(Object id : messageIds.getAllValues()) { + spout.ack(id); + } - ArgumentCaptor remainingIds = ArgumentCaptor.forClass(Object.class); + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + //Commit offsets + spout.nextTuple(); - verify(context.collector, times(messageCount)).emit( - eq(SingleTopicKafkaSpoutConfiguration.STREAM), - anyList(), - remainingIds.capture()); - for (Object id : remainingIds.getAllValues()) { - context.spout.ack(id); + verifyAllMessagesCommitted(messageCount); } - - for(Object item : context.spout.acked.values()) { - assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item); - }; } @Test public void shouldEmitAllMessages() throws Exception { - int messageCount = 10; - SpoutContext context = initializeSpout(messageCount); - - - for (int i = 0; i < messageCount; i++) { - context.spout.nextTuple(); - ArgumentCaptor messageId = ArgumentCaptor.forClass(Object.class); - verify(context.collector).emit( + try (SimulatedTime simulatedTime = new SimulatedTime()) { + int messageCount = 10; + initializeSpout(messageCount); + + //Emit all messages and check that they are emitted. Ack the messages too + for(int i = 0; i < messageCount; i++) { + spout.nextTuple(); + ArgumentCaptor messageId = ArgumentCaptor.forClass(Object.class); + verify(collector).emit( eq(SingleTopicKafkaSpoutConfiguration.STREAM), eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC, - Integer.toString(i), - Integer.toString(i))), - messageId.capture()); - context.spout.ack(messageId.getValue()); - reset(context.collector); - }; - - for (Object item : context.spout.acked.values()) { - assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item); - }; + Integer.toString(i), + Integer.toString(i))), + messageId.capture()); + spout.ack(messageId.getValue()); + reset(collector); + } + + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + //Commit offsets + spout.nextTuple(); + + verifyAllMessagesCommitted(messageCount); + } } @Test public void shouldReplayInOrderFailedMessages() throws Exception { - int messageCount = 10; - SpoutContext context = initializeSpout(messageCount); - - //play and ack 1 tuple - ArgumentCaptor messageIdAcked = ArgumentCaptor.forClass(Object.class); - context.spout.nextTuple(); - verify(context.collector).emit(anyString(), anyList(), messageIdAcked.capture()); - context.spout.ack(messageIdAcked.getValue()); - reset(context.collector); - - //play and fail 1 tuple - ArgumentCaptor messageIdFailed = ArgumentCaptor.forClass(Object.class); - context.spout.nextTuple(); - verify(context.collector).emit(anyString(), anyList(), messageIdFailed.capture()); - context.spout.fail(messageIdFailed.getValue()); - reset(context.collector); - - //pause so that failed tuples will be retried - Thread.sleep(200); - - - //allow for some calls to nextTuple() to fail to emit a tuple - for (int i = 0; i < messageCount + 5; i++) { - context.spout.nextTuple(); - }; - - ArgumentCaptor remainingMessageIds = ArgumentCaptor.forClass(Object.class); - - //1 message replayed, messageCount - 2 messages emitted for the first time - verify(context.collector, times(messageCount - 1)).emit( + try (SimulatedTime simulatedTime = new SimulatedTime()) { + int messageCount = 10; + initializeSpout(messageCount); + + //play and ack 1 tuple + ArgumentCaptor messageIdAcked = ArgumentCaptor.forClass(Object.class); + spout.nextTuple(); + verify(collector).emit(anyString(), anyList(), messageIdAcked.capture()); + spout.ack(messageIdAcked.getValue()); + reset(collector); + + //play and fail 1 tuple + ArgumentCaptor messageIdFailed = ArgumentCaptor.forClass(Object.class); + spout.nextTuple(); + verify(collector).emit(anyString(), anyList(), messageIdFailed.capture()); + spout.fail(messageIdFailed.getValue()); + reset(collector); + + //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait. + for(int i = 0; i < messageCount; i++) { + spout.nextTuple(); + } + + ArgumentCaptor remainingMessageIds = ArgumentCaptor.forClass(Object.class); + //All messages except the first acked message should have been emitted + verify(collector, times(messageCount - 1)).emit( eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), remainingMessageIds.capture()); - for (Object id : remainingMessageIds.getAllValues()) { - context.spout.ack(id); - } + for(Object id : remainingMessageIds.getAllValues()) { + spout.ack(id); + } - for (Object item : context.spout.acked.values()) { - assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item); - }; + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + //Commit offsets + spout.nextTuple(); + + verifyAllMessagesCommitted(messageCount); + } } @Test public void shouldReplayFirstTupleFailedOutOfOrder() throws Exception { - int messageCount = 10; - SpoutContext context = initializeSpout(messageCount); - - - //play 1st tuple - ArgumentCaptor messageIdToFail = ArgumentCaptor.forClass(Object.class); - context.spout.nextTuple(); - verify(context.collector).emit(anyString(), anyList(), messageIdToFail.capture()); - reset(context.collector); - - //play 2nd tuple - ArgumentCaptor messageIdToAck = ArgumentCaptor.forClass(Object.class); - context.spout.nextTuple(); - verify(context.collector).emit(anyString(), anyList(), messageIdToAck.capture()); - reset(context.collector); - - //ack 2nd tuple - context.spout.ack(messageIdToAck.getValue()); - //fail 1st tuple - context.spout.fail(messageIdToFail.getValue()); - - //pause so that failed tuples will be retried - Thread.sleep(200); - - //allow for some calls to nextTuple() to fail to emit a tuple - for (int i = 0; i < messageCount + 5; i++) { - context.spout.nextTuple(); - }; - - ArgumentCaptor remainingIds = ArgumentCaptor.forClass(Object.class); - //1 message replayed, messageCount - 2 messages emitted for the first time - verify(context.collector, times(messageCount - 1)).emit( + try (SimulatedTime simulatedTime = new SimulatedTime()) { + int messageCount = 10; + initializeSpout(messageCount); + + //play 1st tuple + ArgumentCaptor messageIdToFail = ArgumentCaptor.forClass(Object.class); + spout.nextTuple(); + verify(collector).emit(anyString(), anyList(), messageIdToFail.capture()); + reset(collector); + + //play 2nd tuple + ArgumentCaptor messageIdToAck = ArgumentCaptor.forClass(Object.class); + spout.nextTuple(); + verify(collector).emit(anyString(), anyList(), messageIdToAck.capture()); + reset(collector); + + //ack 2nd tuple + spout.ack(messageIdToAck.getValue()); + //fail 1st tuple + spout.fail(messageIdToFail.getValue()); + + //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait. + for(int i = 0; i < messageCount; i++) { + spout.nextTuple(); + } + + ArgumentCaptor remainingIds = ArgumentCaptor.forClass(Object.class); + //All messages except the first acked message should have been emitted + verify(collector, times(messageCount - 1)).emit( eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), remainingIds.capture()); - for (Object id : remainingIds.getAllValues()) { - context.spout.ack(id); - }; + for(Object id : remainingIds.getAllValues()) { + spout.ack(id); + } - for (Object item : context.spout.acked.values()) { - assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item); - }; + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + //Commit offsets + spout.nextTuple(); + + verifyAllMessagesCommitted(messageCount); + } } -} \ No newline at end of file +} diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java index 6e59d420ae5..d49516e234c 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java @@ -53,9 +53,9 @@ public static void main(String[] args) throws Exception { protected void runMain(String[] args) throws Exception { if (args.length == 0) { - submitTopologyLocalCluster(getTopolgyKafkaSpout(), getConfig()); + submitTopologyLocalCluster(getTopologyKafkaSpout(), getConfig()); } else { - submitTopologyRemoteCluster(args[0], getTopolgyKafkaSpout(), getConfig()); + submitTopologyRemoteCluster(args[0], getTopologyKafkaSpout(), getConfig()); } } @@ -86,7 +86,7 @@ protected Config getConfig() { return config; } - protected StormTopology getTopolgyKafkaSpout() { + protected StormTopology getTopologyKafkaSpout() { final TopologyBuilder tp = new TopologyBuilder(); tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1); tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()) diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java index 8b967fad484..cfc34465ac2 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java @@ -40,7 +40,7 @@ public static void main(String[] args) throws Exception { new KafkaSpoutTopologyMainWildcardTopics().runMain(args); } - protected StormTopology getTopolgyKafkaSpout() { + protected StormTopology getTopologyKafkaSpout() { final TopologyBuilder tp = new TopologyBuilder(); tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1); tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM); diff --git a/pom.xml b/pom.xml index 7f4f965c6f3..a45903a8be0 100644 --- a/pom.xml +++ b/pom.xml @@ -338,7 +338,6 @@ junit junit - ${junit.version} test diff --git a/storm-core/src/jvm/org/apache/storm/utils/Time.java b/storm-core/src/jvm/org/apache/storm/utils/Time.java index e501b6cd790..a6a4fe1322f 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/Time.java +++ b/storm-core/src/jvm/org/apache/storm/utils/Time.java @@ -24,38 +24,67 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +/** + * This class implements time simulation support. When time simulation is enabled, methods on this class will use fixed time. + * When time simulation is disabled, methods will pass through to relevant java.lang.System/java.lang.Thread calls. + * Methods using units higher than nanoseconds will pass through to System.currentTimeMillis(). Methods supporting nanoseconds will pass through to System.nanoTime(). + */ public class Time { public static final Logger LOG = LoggerFactory.getLogger(Time.class); private static AtomicBoolean simulating = new AtomicBoolean(false); - private static AtomicLong autoAdvanceOnSleep = new AtomicLong(0); + private static AtomicLong autoAdvanceNanosOnSleep = new AtomicLong(0); //TODO: should probably use weak references here or something - private static volatile Map threadSleepTimes; + private static volatile Map threadSleepTimesNanos; private static final Object sleepTimesLock = new Object(); + private static AtomicLong simulatedCurrTimeNanos; - private static AtomicLong simulatedCurrTimeMs; //should this be a thread local that's allowed to keep advancing? - - public static void startSimulating() { - synchronized(sleepTimesLock) { - simulating.set(true); - simulatedCurrTimeMs = new AtomicLong(0); - threadSleepTimes = new ConcurrentHashMap<>(); + public static class SimulatedTime implements AutoCloseable { + + public SimulatedTime() { + this(null); + } + + public SimulatedTime(Number advanceTimeMs) { + synchronized(Time.sleepTimesLock) { + Time.simulating.set(true); + Time.simulatedCurrTimeNanos = new AtomicLong(0); + Time.threadSleepTimesNanos = new ConcurrentHashMap<>(); + if (advanceTimeMs != null) { + Time.autoAdvanceNanosOnSleep.set(millisToNanos(advanceTimeMs.longValue())); + } + LOG.warn("AutoCloseable Simulated Time Starting..."); + } + } + + @Override + public void close() { + synchronized(Time.sleepTimesLock) { + Time.simulating.set(false); + Time.autoAdvanceNanosOnSleep.set(0); + Time.threadSleepTimesNanos = null; + LOG.warn("AutoCloseable Simulated Time Ending..."); + } } } - public static void startSimulatingAutoAdvanceOnSleep(long ms) { - synchronized(sleepTimesLock) { - startSimulating(); - autoAdvanceOnSleep.set(ms); + @Deprecated + public static void startSimulating() { + synchronized(Time.sleepTimesLock) { + Time.simulating.set(true); + Time.simulatedCurrTimeNanos = new AtomicLong(0); + Time.threadSleepTimesNanos = new ConcurrentHashMap<>(); + LOG.warn("Simulated Time Starting..."); } } + @Deprecated public static void stopSimulating() { - synchronized(sleepTimesLock) { - simulating.set(false); - autoAdvanceOnSleep.set(0); - threadSleepTimes = null; + synchronized(Time.sleepTimesLock) { + Time.simulating.set(false); + Time.autoAdvanceNanosOnSleep.set(0); + Time.threadSleepTimesNanos = null; + LOG.warn("Simulated Time Ending..."); } } @@ -65,44 +94,66 @@ public static boolean isSimulating() { public static void sleepUntil(long targetTimeMs) throws InterruptedException { if(simulating.get()) { - try { - synchronized(sleepTimesLock) { - if (threadSleepTimes == null) { + simulatedSleepUntilNanos(millisToNanos(targetTimeMs)); + } else { + long sleepTimeMs = targetTimeMs - currentTimeMillis(); + if(sleepTimeMs>0) { + Thread.sleep(sleepTimeMs); + } + } + } + + public static void sleepUntilNanos(long targetTimeNanos) throws InterruptedException { + if(simulating.get()) { + simulatedSleepUntilNanos(targetTimeNanos); + } else { + long sleepTimeNanos = targetTimeNanos-nanoTime(); + long sleepTimeMs = nanosToMillis(sleepTimeNanos); + int sleepTimeNanosSansMs = (int)(sleepTimeNanos%1_000_000); + if(sleepTimeNanos>0) { + Thread.sleep(sleepTimeMs, sleepTimeNanosSansMs); + } + } + } + + private static void simulatedSleepUntilNanos(long targetTimeNanos) throws InterruptedException { + try { + synchronized (sleepTimesLock) { + if (threadSleepTimesNanos == null) { + LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE")); + throw new InterruptedException(); + } + threadSleepTimesNanos.put(Thread.currentThread(), new AtomicLong(targetTimeNanos)); + } + while (simulatedCurrTimeNanos.get() < targetTimeNanos) { + synchronized (sleepTimesLock) { + if (threadSleepTimesNanos == null) { LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE")); throw new InterruptedException(); } - threadSleepTimes.put(Thread.currentThread(), new AtomicLong(targetTimeMs)); } - while(simulatedCurrTimeMs.get() < targetTimeMs) { - synchronized(sleepTimesLock) { - if (threadSleepTimes == null) { - LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE")); - throw new InterruptedException(); - } - } - long autoAdvance = autoAdvanceOnSleep.get(); - if (autoAdvance > 0) { - advanceTime(autoAdvance); - } - Thread.sleep(10); + long autoAdvance = autoAdvanceNanosOnSleep.get(); + if (autoAdvance > 0) { + advanceTimeNanos(autoAdvance); } - } finally { - synchronized(sleepTimesLock) { - if (simulating.get() && threadSleepTimes != null) { - threadSleepTimes.remove(Thread.currentThread()); - } + Thread.sleep(10); + } + } finally { + synchronized (sleepTimesLock) { + if (simulating.get() && threadSleepTimesNanos != null) { + threadSleepTimesNanos.remove(Thread.currentThread()); } } - } else { - long sleepTime = targetTimeMs-currentTimeMillis(); - if(sleepTime>0) - Thread.sleep(sleepTime); } } public static void sleep(long ms) throws InterruptedException { sleepUntil(currentTimeMillis()+ms); } + + public static void sleepNanos(long nanos) throws InterruptedException { + sleepUntilNanos(nanoTime() + nanos); + } public static void sleepSecs (long secs) throws InterruptedException { if (secs > 0) { @@ -110,14 +161,30 @@ public static void sleepSecs (long secs) throws InterruptedException { } } + public static long nanoTime() { + if (simulating.get()) { + return simulatedCurrTimeNanos.get(); + } else { + return System.nanoTime(); + } + } + public static long currentTimeMillis() { if(simulating.get()) { - return simulatedCurrTimeMs.get(); + return nanosToMillis(simulatedCurrTimeNanos.get()); } else { return System.currentTimeMillis(); } } + public static long nanosToMillis(long nanos) { + return nanos/1_000_000; + } + + public static long millisToNanos(long millis) { + return millis*1_000_000; + } + public static long secsToMillis (int secs) { return 1000*(long) secs; } @@ -139,18 +206,32 @@ public static long deltaMs(long timeInMilliseconds) { } public static void advanceTime(long ms) { - if (!simulating.get()) throw new IllegalStateException("Cannot simulate time unless in simulation mode"); - if (ms < 0) throw new IllegalArgumentException("advanceTime only accepts positive time as an argument"); - long newTime = simulatedCurrTimeMs.addAndGet(ms); - LOG.warn("Advanced simulated time to {}", newTime); + advanceTimeNanos(millisToNanos(ms)); + } + + public static void advanceTimeNanos(long nanos) { + if (!simulating.get()) { + throw new IllegalStateException("Cannot simulate time unless in simulation mode"); + } + if (nanos < 0) { + throw new IllegalArgumentException("advanceTime only accepts positive time as an argument"); + } + long newTime = simulatedCurrTimeNanos.addAndGet(nanos); + LOG.debug("Advanced simulated time to {}", newTime); + } + + public static void advanceTimeSecs(long secs) { + advanceTime(secs * 1_000); } public static boolean isThreadWaiting(Thread t) { - if(!simulating.get()) throw new IllegalStateException("Must be in simulation mode"); + if(!simulating.get()) { + throw new IllegalStateException("Must be in simulation mode"); + } AtomicLong time; synchronized(sleepTimesLock) { - time = threadSleepTimes.get(t); + time = threadSleepTimesNanos.get(t); } - return !t.isAlive() || time!=null && currentTimeMillis() < time.longValue(); + return !t.isAlive() || time!=null && nanoTime() < time.longValue(); } } diff --git a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java index 24ccda5e3f8..9cd85f82970 100644 --- a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java +++ b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java @@ -43,6 +43,7 @@ import org.apache.storm.scheduler.ISupervisor; import org.apache.storm.utils.LocalState; import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; import org.junit.Test; public class SlotTest { @@ -113,8 +114,7 @@ public void testEquivilant() { @Test public void testEmptyToEmpty() throws Exception { - Time.startSimulatingAutoAdvanceOnSleep(1010); - try { + try (SimulatedTime simulatedTime = new SimulatedTime(1010)) { ILocalizer localizer = mock(ILocalizer.class); LocalState state = mock(LocalState.class); ContainerLauncher containerLauncher = mock(ContainerLauncher.class); @@ -125,15 +125,12 @@ public void testEmptyToEmpty() throws Exception { DynamicState nextState = Slot.handleEmpty(dynamicState, staticState); assertEquals(MachineState.EMPTY, nextState.state); assertTrue(Time.currentTimeMillis() > 1000); - } finally { - Time.stopSimulating(); } } @Test public void testLaunchContainerFromEmpty() throws Exception { - Time.startSimulatingAutoAdvanceOnSleep(1010); - try { + try (SimulatedTime simulatedTime = new SimulatedTime(1010)) { int port = 8080; String topoId = "NEW"; List execList = mkExecutorInfoList(1,2,3,4,5); @@ -210,16 +207,13 @@ public void testLaunchContainerFromEmpty() throws Exception { assertSame(newAssignment, nextState.currentAssignment); assertSame(container, nextState.container); assertTrue(Time.currentTimeMillis() > 2000); - } finally { - Time.stopSimulating(); } } @Test public void testRelaunch() throws Exception { - Time.startSimulatingAutoAdvanceOnSleep(1010); - try { + try (SimulatedTime simulatedTime = new SimulatedTime(1010)) { int port = 8080; String topoId = "CURRENT"; List execList = mkExecutorInfoList(1,2,3,4,5); @@ -260,15 +254,12 @@ public void testRelaunch() throws Exception { nextState = Slot.stateMachineStep(nextState, staticState); assertEquals(MachineState.RUNNING, nextState.state); - } finally { - Time.stopSimulating(); } } @Test public void testReschedule() throws Exception { - Time.startSimulatingAutoAdvanceOnSleep(1010); - try { + try (SimulatedTime simulatedTime = new SimulatedTime(1010)) { int port = 8080; String cTopoId = "CURRENT"; List cExecList = mkExecutorInfoList(1,2,3,4,5); @@ -368,16 +359,13 @@ public void testReschedule() throws Exception { assertSame(nAssignment, nextState.currentAssignment); assertSame(nContainer, nextState.container); assertTrue(Time.currentTimeMillis() > 4000); - } finally { - Time.stopSimulating(); } } @Test public void testRunningToEmpty() throws Exception { - Time.startSimulatingAutoAdvanceOnSleep(1010); - try { + try (SimulatedTime simulatedTime = new SimulatedTime(1010)) { int port = 8080; String cTopoId = "CURRENT"; List cExecList = mkExecutorInfoList(1,2,3,4,5); @@ -432,15 +420,12 @@ public void testRunningToEmpty() throws Exception { assertEquals(null, nextState.container); assertEquals(null, nextState.currentAssignment); assertTrue(Time.currentTimeMillis() > 3000); - } finally { - Time.stopSimulating(); } } @Test public void testRunWithProfileActions() throws Exception { - Time.startSimulatingAutoAdvanceOnSleep(1010); - try { + try (SimulatedTime simulatedTime = new SimulatedTime(1010)) { int port = 8080; String cTopoId = "CURRENT"; List cExecList = mkExecutorInfoList(1,2,3,4,5); @@ -508,8 +493,6 @@ public void testRunWithProfileActions() throws Exception { assertEquals(Collections. emptySet(), nextState.pendingStopProfileActions); assertEquals(Collections. emptySet(), nextState.profileActions); assertTrue(Time.currentTimeMillis() > 5000); - } finally { - Time.stopSimulating(); } } }