From b54c5aaf85193cb71757e9de25419f864bd68c55 Mon Sep 17 00:00:00 2001 From: Cong Zhao Date: Sat, 20 Jan 2024 09:35:56 +0800 Subject: [PATCH] [fix][broker] Fix issue with GetMessageIdByTimestamp can't find match messageId from compacted ledger (#21600) --- .../admin/impl/PersistentTopicsBase.java | 60 +++++++++++++---- .../pulsar/compaction/CompactedTopicImpl.java | 50 ++++++++++++++ .../PulsarTopicCompactionService.java | 56 +--------------- .../broker/admin/PersistentTopicsTest.java | 67 +++++++++++++++++++ 4 files changed, 166 insertions(+), 67 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 4856e524bf01b..0dbd4f8442b35 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2882,28 +2882,62 @@ protected CompletableFuture internalGetMessageIdByTimestampAsync(long throw new RestException(Status.METHOD_NOT_ALLOWED, "Get message ID by timestamp on a non-persistent topic is not allowed"); } - ManagedLedger ledger = ((PersistentTopic) topic).getManagedLedger(); - return ledger.asyncFindPosition(entry -> { + final PersistentTopic persistentTopic = (PersistentTopic) topic; + + return persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenCompose(lastEntry -> { + if (lastEntry == null) { + return findMessageIdByPublishTime(timestamp, persistentTopic.getManagedLedger()); + } + MessageMetadata metadata; + Position position = lastEntry.getPosition(); try { - long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); - return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp); - } catch (Exception e) { - log.error("[{}] Error deserializing message for message position find", topicName, e); + metadata = Commands.parseMessageMetadata(lastEntry.getDataBuffer()); } finally { - entry.release(); + lastEntry.release(); } - return false; - }).thenApply(position -> { - if (position == null) { - return null; + if (timestamp == metadata.getPublishTime()) { + return CompletableFuture.completedFuture(new MessageIdImpl(position.getLedgerId(), + position.getEntryId(), topicName.getPartitionIndex())); + } else if (timestamp < metadata.getPublishTime()) { + return persistentTopic.getTopicCompactionService().findEntryByPublishTime(timestamp) + .thenApply(compactedEntry -> { + try { + return new MessageIdImpl(compactedEntry.getLedgerId(), + compactedEntry.getEntryId(), topicName.getPartitionIndex()); + } finally { + compactedEntry.release(); + } + }); } else { - return new MessageIdImpl(position.getLedgerId(), position.getEntryId(), - topicName.getPartitionIndex()); + return findMessageIdByPublishTime(timestamp, persistentTopic.getManagedLedger()); } }); }); } + private CompletableFuture findMessageIdByPublishTime(long timestamp, ManagedLedger managedLedger) { + return managedLedger.asyncFindPosition(entry -> { + try { + long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); + return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp); + } catch (Exception e) { + log.error("[{}] Error deserializing message for message position find", + topicName, + e); + } finally { + entry.release(); + } + return false; + }).thenApply(position -> { + if (position == null) { + return null; + } else { + return new MessageIdImpl(position.getLedgerId(), position.getEntryId(), + topicName.getPartitionIndex()); + } + }); + } + protected CompletableFuture internalPeekNthMessageAsync(String subName, int messagePosition, boolean authoritative) { CompletableFuture ret; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index a8e124c84a250..d13ce61753d86 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -32,6 +32,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.function.Predicate; import javax.annotation.Nullable; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -320,6 +321,55 @@ public CompletableFuture readLastEntryOfCompactedLedger() { }); } + CompletableFuture findFirstMatchEntry(final Predicate predicate) { + var compactedTopicContextFuture = this.getCompactedTopicContextFuture(); + + if (compactedTopicContextFuture == null) { + return CompletableFuture.completedFuture(null); + } + return compactedTopicContextFuture.thenCompose(compactedTopicContext -> { + LedgerHandle lh = compactedTopicContext.getLedger(); + CompletableFuture promise = new CompletableFuture<>(); + findFirstMatchIndexLoop(predicate, 0L, lh.getLastAddConfirmed(), promise, null, lh); + return promise.thenCompose(index -> { + if (index == null) { + return CompletableFuture.completedFuture(null); + } + return readEntries(lh, index, index).thenApply(entries -> entries.get(0)); + }); + }); + } + private static void findFirstMatchIndexLoop(final Predicate predicate, + final long start, final long end, + final CompletableFuture promise, + final Long lastMatchIndex, + final LedgerHandle lh) { + if (start > end) { + promise.complete(lastMatchIndex); + return; + } + + long mid = (start + end) / 2; + readEntries(lh, mid, mid).thenAccept(entries -> { + Entry entry = entries.get(0); + final boolean isMatch; + try { + isMatch = predicate.test(entry); + } finally { + entry.release(); + } + + if (isMatch) { + findFirstMatchIndexLoop(predicate, start, mid - 1, promise, mid, lh); + } else { + findFirstMatchIndexLoop(predicate, mid + 1, end, promise, lastMatchIndex, lh); + } + }).exceptionally(ex -> { + promise.completeExceptionally(ex); + return null; + }); + } + private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m) { return ComparisonChain.start() .compare(p.getLedgerId(), m.getLedgerId()) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java index 1d3f94dcb9048..16543bc7aa77f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java @@ -22,7 +22,6 @@ import static org.apache.pulsar.compaction.CompactedTopicImpl.COMPACT_LEDGER_EMPTY; import static org.apache.pulsar.compaction.CompactedTopicImpl.NEWER_THAN_COMPACTED; import static org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint; -import static org.apache.pulsar.compaction.CompactedTopicImpl.readEntries; import java.io.IOException; import java.util.Collections; import java.util.List; @@ -33,7 +32,6 @@ import java.util.function.Supplier; import javax.annotation.Nonnull; import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -116,7 +114,7 @@ public CompletableFuture findEntryByPublishTime(long publishTime) { final Predicate predicate = entry -> { return Commands.parseMessageMetadata(entry.getDataBuffer()).getPublishTime() >= publishTime; }; - return findFirstMatchEntry(predicate); + return compactedTopic.findFirstMatchEntry(predicate); } @Override @@ -128,57 +126,7 @@ public CompletableFuture findEntryByEntryIndex(long entryIndex) { } return brokerEntryMetadata.getIndex() >= entryIndex; }; - return findFirstMatchEntry(predicate); - } - - private CompletableFuture findFirstMatchEntry(final Predicate predicate) { - var compactedTopicContextFuture = compactedTopic.getCompactedTopicContextFuture(); - - if (compactedTopicContextFuture == null) { - return CompletableFuture.completedFuture(null); - } - return compactedTopicContextFuture.thenCompose(compactedTopicContext -> { - LedgerHandle lh = compactedTopicContext.getLedger(); - CompletableFuture promise = new CompletableFuture<>(); - findFirstMatchIndexLoop(predicate, 0L, lh.getLastAddConfirmed(), promise, null, lh); - return promise.thenCompose(index -> { - if (index == null) { - return CompletableFuture.completedFuture(null); - } - return readEntries(lh, index, index).thenApply(entries -> entries.get(0)); - }); - }); - } - - private static void findFirstMatchIndexLoop(final Predicate predicate, - final long start, final long end, - final CompletableFuture promise, - final Long lastMatchIndex, - final LedgerHandle lh) { - if (start > end) { - promise.complete(lastMatchIndex); - return; - } - - long mid = (start + end) / 2; - readEntries(lh, mid, mid).thenAccept(entries -> { - Entry entry = entries.get(0); - final boolean isMatch; - try { - isMatch = predicate.test(entry); - } finally { - entry.release(); - } - - if (isMatch) { - findFirstMatchIndexLoop(predicate, start, mid - 1, promise, mid, lh); - } else { - findFirstMatchIndexLoop(predicate, mid + 1, end, promise, lastMatchIndex, lh); - } - }).exceptionally(ex -> { - promise.completeExceptionally(ex); - return null; - }); + return compactedTopic.findFirstMatchEntry(predicate); } public CompactedTopicImpl getCompactedTopic() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 7939b19283946..6a81ffe3aba97 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -31,6 +31,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; import java.lang.reflect.Field; import java.util.ArrayList; @@ -65,6 +67,7 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Topics; @@ -87,6 +90,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfoImpl; @@ -1459,6 +1463,69 @@ public void onSendAcknowledgement(Producer producer, Message message, MessageId .compareTo(id2) > 0); } + @Test + public void testGetMessageIdByTimestampWithCompaction() throws Exception { + TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")); + admin.tenants().createTenant("tenant-xyz", tenantInfo); + admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test")); + final String topicName = "persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestampWithCompaction"; + admin.topics().createNonPartitionedTopic(topicName); + + Map publishTimeMap = new ConcurrentHashMap<>(); + @Cleanup + ProducerBase producer = (ProducerBase) pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .intercept(new ProducerInterceptor() { + @Override + public void close() { + + } + + @Override + public boolean eligible(Message message) { + return true; + } + + @Override + public Message beforeSend(Producer producer, Message message) { + return message; + } + + @Override + public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, + Throwable exception) { + publishTimeMap.put(message.getMessageId(), message.getPublishTime()); + } + }) + .create(); + + MessageId id1 = producer.newMessage().key("K1").value("test1".getBytes()).send(); + MessageId id2 = producer.newMessage().key("K2").value("test2".getBytes()).send(); + + long publish1 = publishTimeMap.get(id1); + long publish2 = publishTimeMap.get(id2); + Assert.assertTrue(publish1 < publish2); + + admin.topics().triggerCompaction(topicName); + Awaitility.await().untilAsserted(() -> + assertSame(admin.topics().compactionStatus(topicName).status, + LongRunningProcessStatus.Status.SUCCESS)); + + admin.topics().unload(topicName); + Awaitility.await().untilAsserted(() -> { + PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName, false); + assertEquals(internalStats.ledgers.size(), 1); + assertEquals(internalStats.ledgers.get(0).entries, 0); + }); + + Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 - 1), id1); + Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1), id1); + Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 + 1), id2); + Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish2), id2); + Assert.assertTrue(admin.topics().getMessageIdByTimestamp(topicName, publish2 + 1) + .compareTo(id2) > 0); + } + @Test public void testGetBatchMessageIdByTimestamp() throws Exception { TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));