diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java index c420767d1e884..1d3f94dcb9048 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java @@ -22,18 +22,23 @@ import static org.apache.pulsar.compaction.CompactedTopicImpl.COMPACT_LEDGER_EMPTY; import static org.apache.pulsar.compaction.CompactedTopicImpl.NEWER_THAN_COMPACTED; import static org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint; +import static org.apache.pulsar.compaction.CompactedTopicImpl.readEntries; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; import java.util.function.Supplier; import javax.annotation.Nonnull; import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.FutureUtil; @@ -106,6 +111,76 @@ public CompletableFuture getLastCompactedPosition() { return CompletableFuture.completedFuture(compactedTopic.getCompactionHorizon().orElse(null)); } + @Override + public CompletableFuture findEntryByPublishTime(long publishTime) { + final Predicate predicate = entry -> { + return Commands.parseMessageMetadata(entry.getDataBuffer()).getPublishTime() >= publishTime; + }; + return findFirstMatchEntry(predicate); + } + + @Override + public CompletableFuture findEntryByEntryIndex(long entryIndex) { + final Predicate predicate = entry -> { + BrokerEntryMetadata brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer()); + if (brokerEntryMetadata == null || !brokerEntryMetadata.hasIndex()) { + return false; + } + return brokerEntryMetadata.getIndex() >= entryIndex; + }; + return findFirstMatchEntry(predicate); + } + + private CompletableFuture findFirstMatchEntry(final Predicate predicate) { + var compactedTopicContextFuture = compactedTopic.getCompactedTopicContextFuture(); + + if (compactedTopicContextFuture == null) { + return CompletableFuture.completedFuture(null); + } + return compactedTopicContextFuture.thenCompose(compactedTopicContext -> { + LedgerHandle lh = compactedTopicContext.getLedger(); + CompletableFuture promise = new CompletableFuture<>(); + findFirstMatchIndexLoop(predicate, 0L, lh.getLastAddConfirmed(), promise, null, lh); + return promise.thenCompose(index -> { + if (index == null) { + return CompletableFuture.completedFuture(null); + } + return readEntries(lh, index, index).thenApply(entries -> entries.get(0)); + }); + }); + } + + private static void findFirstMatchIndexLoop(final Predicate predicate, + final long start, final long end, + final CompletableFuture promise, + final Long lastMatchIndex, + final LedgerHandle lh) { + if (start > end) { + promise.complete(lastMatchIndex); + return; + } + + long mid = (start + end) / 2; + readEntries(lh, mid, mid).thenAccept(entries -> { + Entry entry = entries.get(0); + final boolean isMatch; + try { + isMatch = predicate.test(entry); + } finally { + entry.release(); + } + + if (isMatch) { + findFirstMatchIndexLoop(predicate, start, mid - 1, promise, mid, lh); + } else { + findFirstMatchIndexLoop(predicate, mid + 1, end, promise, lastMatchIndex, lh); + } + }).exceptionally(ex -> { + promise.completeExceptionally(ex); + return null; + }); + } + public CompactedTopicImpl getCompactedTopic() { return compactedTopic; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java index 74df0dafabdcd..fdd6bebbdec33 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java @@ -60,4 +60,21 @@ public interface TopicCompactionService extends AutoCloseable { * @return a future that will be completed with the last compacted position, this position can be null. */ CompletableFuture getLastCompactedPosition(); + + /** + * Find the first entry that greater or equal to target publishTime. + * + * @param publishTime the publish time of entry. + * @return the first entry metadata that greater or equal to target publishTime, this entry can be null. + */ + CompletableFuture findEntryByPublishTime(long publishTime); + + /** + * Find the first entry that greater or equal to target entryIndex, + * if an entry that broker entry metadata is missed, then it will be skipped and find the next match entry. + * + * @param entryIndex the index of entry. + * @return the first entry that greater or equal to target entryIndex, this entry can be null. + */ + CompletableFuture findEntryByEntryIndex(long entryIndex); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java index 4abe00fb0c631..d84d1ccc9ea45 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java @@ -21,6 +21,8 @@ import static org.apache.pulsar.compaction.Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY; import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; import static org.testng.Assert.assertEquals; +import static org.testng.AssertJUnit.assertNotNull; +import static org.testng.AssertJUnit.fail; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.util.List; @@ -35,13 +37,14 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; +import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.protocol.Commands; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -55,6 +58,8 @@ public class TopicCompactionServiceTest extends MockedPulsarServiceBaseTest { @BeforeMethod @Override public void setup() throws Exception { + conf.setExposingBrokerEntryMetadataToClientEnabled(true); + super.internalSetup(); admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); @@ -82,7 +87,7 @@ public void cleanup() throws Exception { } @Test - public void test() throws PulsarClientException, PulsarAdminException { + public void test() throws Exception { String topic = "persistent://prop-xyz/ns1/my-topic"; PulsarTopicCompactionService service = new PulsarTopicCompactionService(topic, bk, () -> compactor); @@ -93,6 +98,18 @@ public void test() throws PulsarClientException, PulsarAdminException { .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); + producer.newMessage() + .key("c") + .value("C_0".getBytes()) + .send(); + + conf.setBrokerEntryMetadataInterceptors(org.assertj.core.util.Sets.newTreeSet( + "org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor" + )); + restartBroker(); + + long startTime = System.currentTimeMillis(); + producer.newMessage() .key("a") .value("A_1".getBytes()) @@ -133,7 +150,7 @@ public void test() throws PulsarClientException, PulsarAdminException { assertEquals(admin.topics().getInternalStats(topic).lastConfirmedEntry, lastCompactedPosition.toString()); List entries = service.readCompactedEntries(PositionImpl.EARLIEST, 4).join(); - assertEquals(entries.size(), 2); + assertEquals(entries.size(), 3); entries.stream().map(e -> { try { return MessageImpl.deserialize(e.getDataBuffer()); @@ -144,12 +161,40 @@ public void test() throws PulsarClientException, PulsarAdminException { String data = new String(message.getData()); if (Objects.equals(message.getKey(), "a")) { assertEquals(data, "A_2"); - } else { + } else if (Objects.equals(message.getKey(), "b")) { assertEquals(data, "B_3"); + } else if (Objects.equals(message.getKey(), "c")) { + assertEquals(data, "C_0"); + } else { + fail(); } }); List entries2 = service.readCompactedEntries(PositionImpl.EARLIEST, 1).join(); assertEquals(entries2.size(), 1); + + Entry entry = service.findEntryByEntryIndex(0).join(); + BrokerEntryMetadata brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist(entry.getDataBuffer()); + assertNotNull(brokerEntryMetadata); + assertEquals(brokerEntryMetadata.getIndex(), 2); + MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + assertEquals(metadata.getPartitionKey(), "a"); + entry.release(); + + entry = service.findEntryByEntryIndex(3).join(); + brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist(entry.getDataBuffer()); + assertNotNull(brokerEntryMetadata); + assertEquals(brokerEntryMetadata.getIndex(), 4); + metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + assertEquals(metadata.getPartitionKey(), "b"); + entry.release(); + + entry = service.findEntryByPublishTime(startTime).join(); + brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist(entry.getDataBuffer()); + assertNotNull(brokerEntryMetadata); + assertEquals(brokerEntryMetadata.getIndex(), 2); + metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + assertEquals(metadata.getPartitionKey(), "a"); + entry.release(); } }