diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java index 9e7514987bf56..c90b01f497058 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java @@ -66,7 +66,7 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback MessageImpl msg = null; try { msg = MessageImpl.deserialize(entry.getDataBuffer()); - return msg.getPublishTime() <= timestamp; + return msg.getPublishTime() < timestamp; } catch (Exception e) { log.error("[{}][{}] Error deserializing message for message position find", topicName, subName, e); } finally { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 1b0ae9f77ca07..5e6a2135fdd92 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -654,7 +654,7 @@ public void findEntryComplete(Position position, Object ctx) { "[{}][{}] Unable to find position for timestamp {}. Resetting cursor to first position {} in ledger", topicName, subName, timestamp, finalPosition); } else { - finalPosition = position; + finalPosition = position.getNext(); } resetCursor(finalPosition, future); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 55cfc13b22ea7..61d8cd0d58df3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -1593,14 +1593,14 @@ public void persistentTopicsCursorReset(String topicName) throws Exception { int receivedAfterReset = 0; - for (int i = 4; i < 10; i++) { + for (int i = 5; i < 10; i++) { Message message = consumer.receive(); consumer.acknowledge(message); ++receivedAfterReset; String expected = "message-" + i; assertEquals(message.getData(), expected.getBytes()); } - assertEquals(receivedAfterReset, 6); + assertEquals(receivedAfterReset, 5); consumer.close(); @@ -1652,29 +1652,29 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep int receivedAfterReset = 0; - // Should received messages from 4-9 - for (int i = 4; i < 10; i++) { + // Should received messages from 5-9 + for (int i = 5; i < 10; i++) { Message message = consumer.receive(); consumer.acknowledge(message); ++receivedAfterReset; String expected = "message-" + i; assertEquals(new String(message.getData()), expected); } - assertEquals(receivedAfterReset, 6); + assertEquals(receivedAfterReset, 5); // Reset at 2nd timestamp receivedAfterReset = 0; admin.topics().resetCursor(topicName, "my-sub", secondTimestamp); - // Should received messages from 7-9 - for (int i = 7; i < 10; i++) { + // Should received messages from 8-9 + for (int i = 8; i < 10; i++) { Message message = consumer.receive(); consumer.acknowledge(message); ++receivedAfterReset; String expected = "message-" + i; assertEquals(new String(message.getData()), expected); } - assertEquals(receivedAfterReset, 3); + assertEquals(receivedAfterReset, 2); consumer.close(); admin.topics().deleteSubscription(topicName, "my-sub"); @@ -1722,14 +1722,14 @@ public void persistentTopicsCursorResetAndFailover() throws Exception { .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); int receivedAfterReset = 0; - for (int i = 4; i < 10; i++) { + for (int i = 5; i < 10; i++) { Message message = consumerA.receive(5, TimeUnit.SECONDS); consumerA.acknowledge(message); ++receivedAfterReset; String expected = "message-" + i; assertEquals(message.getData(), expected.getBytes()); } - assertEquals(receivedAfterReset, 6); + assertEquals(receivedAfterReset, 5); // Closing consumerA activates consumerB consumerA.close(); @@ -1785,7 +1785,7 @@ public void partitionedTopicsCursorReset(String topicName) throws Exception { Set expectedMessages = Sets.newHashSet(); Set receivedMessages = Sets.newHashSet(); - for (int i = 4; i < 10; i++) { + for (int i = 5; i < 10; i++) { Message message = consumer.receive(); consumer.acknowledge(message); expectedMessages.add("message-" + i); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index eec4e0865dc8b..9d765ec405b1a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -1487,14 +1487,14 @@ public void persistentTopicsCursorReset(String topicName) throws Exception { int receivedAfterReset = 0; - for (int i = 4; i < 10; i++) { + for (int i = 5; i < 10; i++) { Message message = consumer.receive(); consumer.acknowledge(message); ++receivedAfterReset; String expected = "message-" + i; assertEquals(message.getData(), expected.getBytes()); } - assertEquals(receivedAfterReset, 6); + assertEquals(receivedAfterReset, 5); consumer.close(); @@ -1546,29 +1546,29 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep int receivedAfterReset = 0; - // Should received messages from 4-9 - for (int i = 4; i < 10; i++) { + // Should received messages from 5-9 + for (int i = 5; i < 10; i++) { Message message = consumer.receive(); consumer.acknowledge(message); ++receivedAfterReset; String expected = "message-" + i; assertEquals(new String(message.getData()), expected); } - assertEquals(receivedAfterReset, 6); + assertEquals(receivedAfterReset, 5); // Reset at 2nd timestamp receivedAfterReset = 0; admin.topics().resetCursor(topicName, "my-sub", secondTimestamp); - // Should received messages from 7-9 - for (int i = 7; i < 10; i++) { + // Should received messages from 8-9 + for (int i = 8; i < 10; i++) { Message message = consumer.receive(); consumer.acknowledge(message); ++receivedAfterReset; String expected = "message-" + i; assertEquals(new String(message.getData()), expected); } - assertEquals(receivedAfterReset, 3); + assertEquals(receivedAfterReset, 2); consumer.close(); admin.topics().deleteSubscription(topicName, "my-sub"); @@ -1611,7 +1611,7 @@ public void partitionedTopicsCursorReset(String topicName) throws Exception { Set expectedMessages = Sets.newHashSet(); Set receivedMessages = Sets.newHashSet(); - for (int i = 4; i < 10; i++) { + for (int i = 5; i < 10; i++) { Message message = consumer.receive(); consumer.acknowledge(message); expectedMessages.add("message-" + i); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index c9cc07ccaed00..9ea1e0cca07a2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -135,7 +135,7 @@ public void testSeekTime() throws Exception { long currentTimestamp = System.currentTimeMillis(); consumer.seek(currentTimestamp); - assertEquals(sub.getNumberOfEntriesInBacklog(false), 1); + assertEquals(sub.getNumberOfEntriesInBacklog(false), 0); // Wait for consumer to reconnect Thread.sleep(1000); @@ -187,7 +187,7 @@ public void testSeekTimeOnPartitionedTopic() throws Exception { for (PersistentSubscription sub : subs) { backlogs += sub.getNumberOfEntriesInBacklog(false); } - assertEquals(backlogs, 2); + assertEquals(backlogs, 0); // Wait for consumer to reconnect Thread.sleep(1000); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index 8aa1bcdaa917c..552f69db57609 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -754,7 +754,7 @@ public void testReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws Exception { reader.seek(l + plusTime); Set messageSet = Sets.newHashSet(); - for (int i = halfMessages; i < numOfMessage; i++) { + for (int i = halfMessages + 1; i < numOfMessage; i++) { Message message = reader.readNext(); String receivedMessage = new String(message.getData()); String expectedMessage = String.format("msg num %d", i); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index e0a0fe28a78c1..0f801d7356d71 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -245,8 +245,8 @@ public void testReaderWithTimeLong() throws Exception { receivedMessageIds.add(msg.getMessageId()); } - assertEquals(receivedMessageIds.size(), totalMsg + 1); - assertEquals(receivedMessageIds.get(0), lastMsgId); + assertEquals(receivedMessageIds.size(), totalMsg); + assertEquals(receivedMessageIds.get(0), firstMsgId); restartBroker();