Skip to content

Commit

Permalink
[fix][broker] Avoid consumers receiving acknowledged messages from co…
Browse files Browse the repository at this point in the history
…mpacted topic after reconnection (#21187)
  • Loading branch information
coderzc authored and Technoboy- committed Jan 27, 2024
1 parent 0222d97 commit 449c72b
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,10 @@ void markDelete(Position position, Map<String, Long> properties)
*/
void rewind();

default void rewind(boolean readCompacted) {
rewind();
}

/**
* Move the cursor to a different read position.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ private void recoveredCursor(PositionImpl position, Map<String, Long> properties
LedgerHandle recoveredFromCursorLedger) {
// if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
// we need to move to the next existing ledger
if (!ledger.ledgerExists(position.getLedgerId())) {
if (position.getEntryId() == -1L && !ledger.ledgerExists(position.getLedgerId())) {
Long nextExistingLedger = ledger.getNextValidLedger(position.getLedgerId());
if (nextExistingLedger == null) {
log.info("[{}] [{}] Couldn't find next next valid ledger for recovery {}", ledger.getName(), name,
Expand Down Expand Up @@ -2513,9 +2513,15 @@ public Position getPersistentMarkDeletedPosition() {

@Override
public void rewind() {
rewind(false);
}

@Override
public void rewind(boolean readCompacted) {
lock.writeLock().lock();
try {
PositionImpl newReadPosition = ledger.getNextValidPosition(markDeletePosition);
PositionImpl newReadPosition =
readCompacted ? markDeletePosition.getNext() : ledger.getNextValidPosition(markDeletePosition);
PositionImpl oldReadPosition = readPosition;

log.info("[{}-{}] Rewind from {} to {}", ledger.getName(), name, oldReadPosition, newReadPosition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.compaction.CompactedTopicUtils;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.TopicCompactionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -107,9 +108,9 @@ protected void scheduleReadOnActiveConsumer() {
if (log.isDebugEnabled()) {
log.debug("[{}] Rewind cursor and read more entries without delay", name);
}
cursor.rewind();

Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());

notifyActiveConsumerChanged(activeConsumer);
readMoreEntries(activeConsumer);
return;
Expand All @@ -127,9 +128,9 @@ protected void scheduleReadOnActiveConsumer() {
log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name,
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
}
cursor.rewind();

Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());

notifyActiveConsumerChanged(activeConsumer);
readMoreEntries(activeConsumer);
readOnActiveConsumerTask = null;
Expand Down Expand Up @@ -206,7 +207,7 @@ private synchronized void internalReadEntriesComplete(final List<Entry> entries,
}
}
entries.forEach(Entry::release);
cursor.rewind();
cursor.rewind(currentConsumer != null ? currentConsumer.readCompacted() : readConsumer.readCompacted());
if (currentConsumer != null) {
notifyActiveConsumerChanged(currentConsumer);
readMoreEntries(currentConsumer);
Expand Down Expand Up @@ -301,7 +302,7 @@ private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consu
}
cursor.cancelPendingReadRequest();
havePendingRead = false;
cursor.rewind();
cursor.rewind(consumer.readCompacted());
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Cursor rewinded, redelivering unacknowledged messages. ", name, consumer);
}
Expand Down Expand Up @@ -360,7 +361,9 @@ private void readMoreEntries(Consumer consumer) {
}
havePendingRead = true;
if (consumer.readCompacted()) {
boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId());
boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())
&& (!cursor.isDurable() || cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)
|| hasValidMarkDeletePosition(cursor));
TopicCompactionService topicCompactionService = topic.getTopicCompactionService();
CompactedTopicUtils.asyncReadCompactedEntries(topicCompactionService, cursor, messagesToRead,
bytesToRead, topic.getMaxReadPosition(), readFromEarliest, this, true, consumer);
Expand All @@ -378,6 +381,13 @@ private void readMoreEntries(Consumer consumer) {
}
}

private boolean hasValidMarkDeletePosition(ManagedCursor cursor) {
// If `markDeletedPosition.entryID == -1L` then the md-position is an invalid position,
// since the initial md-position of the consumer will be set to it.
// See ManagedLedgerImpl#asyncOpenCursor and ManagedLedgerImpl#getFirstPosition
return cursor.getMarkDeletedPosition() != null && cursor.getMarkDeletedPosition().getEntryId() == -1L;
}

@Override
protected void reScheduleRead() {
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,9 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
}

private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName,
InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicated,
InitialPosition initialPosition,
long startMessageRollbackDurationSec,
boolean replicated,
Map<String, String> subscriptionProperties) {
CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
Expand All @@ -1017,7 +1019,6 @@ private CompletableFuture<Subscription> getDurableSubscription(String subscripti
}

Map<String, Long> properties = PersistentSubscription.getBaseCursorProperties(replicated);

ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, subscriptionProperties,
new OpenCursorCallback() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,
boolean isFirstRead,
ReadEntriesCallback callback, Consumer consumer) {
PositionImpl cursorPosition;
if (isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())){
boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())
&& (!cursor.isDurable() || cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)
|| cursor.getMarkDeletedPosition() == null
|| cursor.getMarkDeletedPosition().getEntryId() == -1L);
if (readFromEarliest){
cursorPosition = PositionImpl.EARLIEST;
} else {
cursorPosition = (PositionImpl) cursor.getReadPosition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
Expand Down Expand Up @@ -868,6 +869,7 @@ public void testReplicatedSubscriptionWithCompaction() throws Exception {
.topic(topicName)
.subscriptionName("sub2")
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.readCompacted(true)
.subscribe();
List<String> result = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1893,6 +1893,7 @@ public void testReadCommittedWithCompaction() throws Exception{
.topic(topic)
.subscriptionName("sub")
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.readCompacted(true)
.subscribe();
List<String> result = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,4 +785,32 @@ public void testReaderListenerAcknowledgement()
admin.topics().deletePartitionedTopic(partitionedTopic);
}

@Test
public void testReaderReconnectedFromNextEntry() throws Exception {
final String topic = "persistent://my-property/my-ns/testReaderReconnectedFromNextEntry";
Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
.startMessageId(MessageId.earliest).create();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();

// Send 3 and consume 1.
producer.send("1");
producer.send("2");
producer.send("3");
Message<String> msg1 = reader.readNext(2, TimeUnit.SECONDS);
assertEquals(msg1.getValue(), "1");

// Trigger reader reconnect.
admin.topics().unload(topic);

// For non-durable we are going to restart from the next entry.
Message<String> msg2 = reader.readNext(2, TimeUnit.SECONDS);
assertEquals(msg2.getValue(), "2");
Message<String> msg3 = reader.readNext(2, TimeUnit.SECONDS);
assertEquals(msg3.getValue(), "3");

// cleanup.
reader.close();
producer.close();
admin.topics().delete(topic, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1875,6 +1875,7 @@ public void testReceiverQueueSize() throws Exception {

ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING)
.topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

//Give some time to consume
Expand Down Expand Up @@ -1918,6 +1919,7 @@ public void testDispatcherMaxReadSizeBytes() throws Exception {

ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) client.newConsumer(Schema.BYTES)
.topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Awaitility.await().untilAsserted(() -> {
Expand Down Expand Up @@ -2190,9 +2192,11 @@ public void testCompactionWithTTL() throws Exception {
});

@Cleanup
Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).readCompacted(true)
.subscribe();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.subscriptionName("sub-2")
.readCompacted(true)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

List<String> result = new ArrayList<>();
while (true) {
Expand All @@ -2206,4 +2210,158 @@ public void testCompactionWithTTL() throws Exception {

Assert.assertEquals(result, List.of("V3", "V4", "V5"));
}

@Test
public void testAcknowledgeWithReconnection() throws Exception {
final String topicName = "persistent://my-property/use/my-ns/testAcknowledge" + UUID.randomUUID();
final String subName = "my-sub";
@Cleanup
PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();

List<String> expected = new ArrayList<>();
for (int i = 0; i < 10; i++) {
producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
expected.add(String.valueOf(i));
}
producer.flush();

admin.topics().triggerCompaction(topicName);

Awaitility.await().untilAsserted(() -> {
assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.SUCCESS);
});

// trim the topic
admin.topics().unload(topicName);

Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName, false);
assertEquals(internalStats.numberOfEntries, 0);
});

ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING)
.topic(topicName).readCompacted(true).receiverQueueSize(1).subscriptionName(subName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.isAckReceiptEnabled(true)
.subscribe();

List<String> results = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
if (message == null) {
break;
}
results.add(message.getValue());
consumer.acknowledge(message);
}

Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getStats(topicName, true).getSubscriptions().get(subName).getMsgBacklog(),
5));

// Make consumer reconnect to broker
admin.topics().unload(topicName);

// Wait for consumer to reconnect and clear incomingMessages
consumer.pause();
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(consumer.numMessagesInQueue(), 0);
});
consumer.resume();

for (int i = 0; i < 5; i++) {
Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
if (message == null) {
break;
}
results.add(message.getValue());
consumer.acknowledge(message);
}

Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getStats(topicName, true).getSubscriptions().get(subName).getMsgBacklog(),
0));

Assert.assertEquals(results, expected);

Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
Assert.assertNull(message);

// Make consumer reconnect to broker
admin.topics().unload(topicName);

producer.newMessage().key("K").value("V").send();
Message<String> message2 = consumer.receive(3, TimeUnit.SECONDS);
Assert.assertEquals(message2.getValue(), "V");
consumer.acknowledge(message2);

Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName);
Assert.assertEquals(internalStats.lastConfirmedEntry,
internalStats.cursors.get(subName).markDeletePosition);
});

consumer.close();
producer.close();
}

@Test
public void testEarliestSubsAfterRollover() throws Exception {
final String topicName = "persistent://my-property/use/my-ns/testEarliestSubsAfterRollover" + UUID.randomUUID();
final String subName = "my-sub";
@Cleanup
PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();

List<String> expected = new ArrayList<>();
for (int i = 0; i < 10; i++) {
producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
expected.add(String.valueOf(i));
}
producer.flush();

admin.topics().triggerCompaction(topicName);

Awaitility.await().untilAsserted(() -> {
assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.SUCCESS);
});

// trim the topic
admin.topics().unload(topicName);

Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName, false);
assertEquals(internalStats.numberOfEntries, 0);
});

// Make ml.getFirstPosition() return new ledger first position
producer.newMessage().key("K").value("V").send();
expected.add("V");

@Cleanup
ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING)
.topic(topicName).readCompacted(true).receiverQueueSize(1).subscriptionName(subName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.isAckReceiptEnabled(true)
.subscribe();

List<String> results = new ArrayList<>();
while (true) {
Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
if (message == null) {
break;
}

results.add(message.getValue());
consumer.acknowledge(message);
}

Assert.assertEquals(results, expected);
}
}

0 comments on commit 449c72b

Please sign in to comment.