Skip to content

Commit

Permalink
[fix][broker] Avoid consumers receiving acknowledged messages from co…
Browse files Browse the repository at this point in the history
…mpacted topic after reconnection (apache#21187)

(cherry picked from commit 24d8d9a)
  • Loading branch information
coderzc authored and mukesh-ctds committed Feb 29, 2024
1 parent 24521de commit c8c4e23
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,10 @@ void markDelete(Position position, Map<String, Long> properties)
*/
void rewind();

default void rewind(boolean readCompacted) {
rewind();
}

/**
* Move the cursor to a different read position.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ private void recoveredCursor(PositionImpl position, Map<String, Long> properties
LedgerHandle recoveredFromCursorLedger) {
// if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
// we need to move to the next existing ledger
if (!ledger.ledgerExists(position.getLedgerId())) {
if (position.getEntryId() == -1L && !ledger.ledgerExists(position.getLedgerId())) {
Long nextExistingLedger = ledger.getNextValidLedger(position.getLedgerId());
if (nextExistingLedger == null) {
log.info("[{}] [{}] Couldn't find next next valid ledger for recovery {}", ledger.getName(), name,
Expand Down Expand Up @@ -2499,9 +2499,15 @@ public Position getPersistentMarkDeletedPosition() {

@Override
public void rewind() {
rewind(false);
}

@Override
public void rewind(boolean readCompacted) {
lock.writeLock().lock();
try {
PositionImpl newReadPosition = ledger.getNextValidPosition(markDeletePosition);
PositionImpl newReadPosition =
readCompacted ? markDeletePosition.getNext() : ledger.getNextValidPosition(markDeletePosition);
PositionImpl oldReadPosition = readPosition;

log.info("[{}-{}] Rewind from {} to {}", ledger.getName(), name, oldReadPosition, newReadPosition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.compaction.CompactedTopicUtils;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.TopicCompactionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -108,9 +109,9 @@ protected void scheduleReadOnActiveConsumer() {
if (log.isDebugEnabled()) {
log.debug("[{}] Rewind cursor and read more entries without delay", name);
}
cursor.rewind();

Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());

notifyActiveConsumerChanged(activeConsumer);
readMoreEntries(activeConsumer);
return;
Expand All @@ -128,9 +129,9 @@ protected void scheduleReadOnActiveConsumer() {
log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name,
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
}
cursor.rewind();

Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());

notifyActiveConsumerChanged(activeConsumer);
readMoreEntries(activeConsumer);
readOnActiveConsumerTask = null;
Expand Down Expand Up @@ -201,7 +202,7 @@ public synchronized void internalReadEntriesComplete(final List<Entry> entries,
log.debug("[{}] rewind because no available consumer found", name);
}
entries.forEach(Entry::release);
cursor.rewind();
cursor.rewind(currentConsumer != null ? currentConsumer.readCompacted() : readConsumer.readCompacted());
if (currentConsumer != null) {
notifyActiveConsumerChanged(currentConsumer);
readMoreEntries(currentConsumer);
Expand Down Expand Up @@ -296,7 +297,7 @@ private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consu
}
cursor.cancelPendingReadRequest();
havePendingRead = false;
cursor.rewind();
cursor.rewind(consumer.readCompacted());
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Cursor rewinded, redelivering unacknowledged messages. ", name, consumer);
}
Expand Down Expand Up @@ -350,7 +351,9 @@ protected void readMoreEntries(Consumer consumer) {
}
havePendingRead = true;
if (consumer.readCompacted()) {
boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId());
boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())
&& (!cursor.isDurable() || cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)
|| hasValidMarkDeletePosition(cursor));
TopicCompactionService topicCompactionService = topic.getTopicCompactionService();
CompactedTopicUtils.asyncReadCompactedEntries(topicCompactionService, cursor, messagesToRead,
bytesToRead, topic.getMaxReadPosition(), readFromEarliest, this, true, consumer);
Expand All @@ -368,6 +371,13 @@ protected void readMoreEntries(Consumer consumer) {
}
}

private boolean hasValidMarkDeletePosition(ManagedCursor cursor) {
// If `markDeletedPosition.entryID == -1L` then the md-position is an invalid position,
// since the initial md-position of the consumer will be set to it.
// See ManagedLedgerImpl#asyncOpenCursor and ManagedLedgerImpl#getFirstPosition
return cursor.getMarkDeletedPosition() != null && cursor.getMarkDeletedPosition().getEntryId() == -1L;
}

@Override
protected void reScheduleRead() {
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,9 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
}

private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName,
InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicated,
InitialPosition initialPosition,
long startMessageRollbackDurationSec,
boolean replicated,
Map<String, String> subscriptionProperties) {
CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
Expand All @@ -988,7 +990,6 @@ private CompletableFuture<Subscription> getDurableSubscription(String subscripti
}

Map<String, Long> properties = PersistentSubscription.getBaseCursorProperties(replicated);

ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, subscriptionProperties,
new OpenCursorCallback() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,
boolean isFirstRead,
ReadEntriesCallback callback, Consumer consumer) {
PositionImpl cursorPosition;
if (isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())){
boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())
&& (!cursor.isDurable() || cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)
|| cursor.getMarkDeletedPosition() == null
|| cursor.getMarkDeletedPosition().getEntryId() == -1L);
if (readFromEarliest){
cursorPosition = PositionImpl.EARLIEST;
} else {
cursorPosition = (PositionImpl) cursor.getReadPosition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
Expand Down Expand Up @@ -868,6 +869,7 @@ public void testReplicatedSubscriptionWithCompaction() throws Exception {
.topic(topicName)
.subscriptionName("sub2")
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.readCompacted(true)
.subscribe();
List<String> result = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1838,4 +1838,63 @@ public void testReadCommittedWithReadCompacted() throws Exception{
Assert.assertEquals(messages, List.of("V2", "V3"));
}


@Test
public void testReadCommittedWithCompaction() throws Exception{
final String namespace = "tnx/ns-prechecks";
final String topic = "persistent://" + namespace + "/test_transaction_topic" + UUID.randomUUID();
admin.namespaces().createNamespace(namespace);
admin.topics().createNonPartitionedTopic(topic);

admin.topicPolicies().setCompactionThreshold(topic, 100 * 1024 * 1024);

@Cleanup
Producer<String> producer = this.pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();

producer.newMessage().key("K1").value("V1").send();

Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
producer.newMessage(txn).key("K2").value("V2").send();
producer.newMessage(txn).key("K3").value("V3").send();
txn.commit().get();

producer.newMessage().key("K1").value("V4").send();

Transaction txn2 = pulsarClient.newTransaction()
.withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
producer.newMessage(txn2).key("K2").value("V5").send();
producer.newMessage(txn2).key("K3").value("V6").send();
txn2.commit().get();

admin.topics().triggerCompaction(topic);

Awaitility.await().untilAsserted(() -> {
assertEquals(admin.topics().compactionStatus(topic).status,
LongRunningProcessStatus.Status.SUCCESS);
});

@Cleanup
Consumer<String> consumer = this.pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub")
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.readCompacted(true)
.subscribe();
List<String> result = new ArrayList<>();
while (true) {
Message<String> receive = consumer.receive(2, TimeUnit.SECONDS);
if (receive == null) {
break;
}

result.add(receive.getValue());
}

Assert.assertEquals(result, List.of("V4", "V5", "V6"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -733,4 +733,32 @@ public void testReaderListenerAcknowledgement()
admin.topics().deletePartitionedTopic(partitionedTopic);
}

@Test
public void testReaderReconnectedFromNextEntry() throws Exception {
final String topic = "persistent://my-property/my-ns/testReaderReconnectedFromNextEntry";
Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
.startMessageId(MessageId.earliest).create();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();

// Send 3 and consume 1.
producer.send("1");
producer.send("2");
producer.send("3");
Message<String> msg1 = reader.readNext(2, TimeUnit.SECONDS);
assertEquals(msg1.getValue(), "1");

// Trigger reader reconnect.
admin.topics().unload(topic);

// For non-durable we are going to restart from the next entry.
Message<String> msg2 = reader.readNext(2, TimeUnit.SECONDS);
assertEquals(msg2.getValue(), "2");
Message<String> msg3 = reader.readNext(2, TimeUnit.SECONDS);
assertEquals(msg3.getValue(), "3");

// cleanup.
reader.close();
producer.close();
admin.topics().delete(topic, false);
}
}
Loading

0 comments on commit c8c4e23

Please sign in to comment.