From e25e176b5ebe4d94deb44c377832153dc393cdea Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 13 Oct 2021 21:02:43 +0800 Subject: [PATCH] Fix message being ignored when the non-persistent topic reader reconnect. Signed-off-by: Zike Yang --- .../pulsar/client/impl/ConsumerImpl.java | 6 +- .../messaging/ReaderMessagingTest.java | 142 ++++++++++++++++++ 2 files changed, 146 insertions(+), 2 deletions(-) create mode 100644 tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/ReaderMessagingTest.java diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 97fb13f89fa35..b29d6c73deff6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1026,7 +1026,8 @@ protected MessageImpl newSingleMessage(final int index, Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, index, numMessages); } - if (isSameEntry(messageId) && isPriorBatchIndex(index)) { + // If the topic is non-persistent, we should not ignore any messages. + if (this.topicName.isPersistent() && isSameEntry(messageId) && isPriorBatchIndex(index)) { // If we are receiving a batch message, we need to discard messages that were prior // to the startMessageId if (log.isDebugEnabled()) { @@ -1199,7 +1200,8 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List ac } } - if (isSameEntry(msgId) && isPriorEntryIndex(messageId.getEntryId())) { + // If the topic is non-persistent, we should not ignore any messages. + if (this.topicName.isPersistent() && isSameEntry(msgId) && isPriorEntryIndex(messageId.getEntryId())) { // We need to discard entries that were prior to startMessageId if (log.isDebugEnabled()) { log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription, diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/ReaderMessagingTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/ReaderMessagingTest.java new file mode 100644 index 0000000000000..cdae130edd910 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/ReaderMessagingTest.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.messaging; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; +import org.testng.annotations.Test; + +@Slf4j +public class ReaderMessagingTest extends MessagingBase { + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testReaderReconnectAndRead(Supplier serviceUrl, Supplier adminUrl) throws Exception { + log.info("-- Starting {} test --", methodName); + final String topicName = getNonPartitionedTopic("test-reader-reconnect-read", false); + @Cleanup final PulsarClient client = PulsarClient.builder() + .serviceUrl(serviceUrl.get()) + .build(); + @Cleanup final Reader reader = client.newReader(Schema.STRING) + .topic(topicName) + .subscriptionName("test-sub") + // Here we need to make sure that setting the startMessageId should not cause a change in the + // behavior of the reader under non. + .startMessageId(MessageId.earliest) + .create(); + + final int messagesToSend = 10; + @Cleanup final Producer producer = client.newProducer(Schema.STRING) + .topic(topicName) + .enableBatching(false) + .create(); + for (int i = 0; i < messagesToSend; i++) { + MessageId messageId = producer.newMessage().value("message-" + i).send(); + assertNotNull(messageId); + } + + for (int i = 0; i < messagesToSend; i++) { + Message msg = reader.readNext(); + assertEquals(msg.getValue(), "message-" + i); + } + + @Cleanup + PulsarAdmin admin = PulsarAdmin.builder() + .serviceHttpUrl(adminUrl.get()) + .build(); + + admin.topics().unload(topicName); + + for (int i = 0; i < messagesToSend; i++) { + MessageId messageId = producer.newMessage().value("message-" + i).send(); + assertNotNull(messageId); + } + + for (int i = 0; i < messagesToSend; i++) { + Message msg = reader.readNext(); + assertEquals(msg.getValue(), "message-" + i); + } + + log.info("-- Exiting {} test --", methodName); + } + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testReaderReconnectAndReadBatchMessages(Supplier serviceUrl, Supplier adminUrl) + throws Exception { + log.info("-- Starting {} test --", methodName); + final String topicName = getNonPartitionedTopic("test-reader-reconnect-read-batch", false); + @Cleanup final PulsarClient client = PulsarClient.builder() + .serviceUrl(serviceUrl.get()) + .build(); + @Cleanup final Reader reader = client.newReader(Schema.STRING) + .topic(topicName) + .subscriptionName("test-sub") + // Here we need to make sure that setting the startMessageId should not cause a change in the + // behavior of the reader under non. + .startMessageId(MessageId.earliest) + .create(); + + final int messagesToSend = 10; + @Cleanup final Producer producer = client.newProducer(Schema.STRING) + .topic(topicName) + .enableBatching(true) + .batchingMaxPublishDelay(5, TimeUnit.SECONDS) + .batchingMaxMessages(5) + .create(); + + for (int i = 0; i < messagesToSend; i++) { + MessageId messageId = producer.newMessage().value("message-" + i).send(); + assertNotNull(messageId); + } + + for (int i = 0; i < messagesToSend; i++) { + Message msg = reader.readNext(); + assertEquals(msg.getValue(), "message-" + i); + } + + @Cleanup + PulsarAdmin admin = PulsarAdmin.builder() + .serviceHttpUrl(adminUrl.get()) + .build(); + + admin.topics().unload(topicName); + + for (int i = 0; i < messagesToSend; i++) { + MessageId messageId = producer.newMessage().value("message-" + i).send(); + assertNotNull(messageId); + } + + for (int i = 0; i < messagesToSend; i++) { + Message msg = reader.readNext(); + assertEquals(msg.getValue(), "message-" + i); + } + + log.info("-- Exiting {} test --", methodName); + } +}