Skip to content

Commit

Permalink
[fix][broker] Fix issue with GetMessageIdByTimestamp can't find match…
Browse files Browse the repository at this point in the history
… messageId from compacted ledger (#21600)
  • Loading branch information
coderzc authored and Technoboy- committed Jan 22, 2024
1 parent bb6f836 commit b54c5aa
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2882,28 +2882,62 @@ protected CompletableFuture<MessageId> internalGetMessageIdByTimestampAsync(long
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Get message ID by timestamp on a non-persistent topic is not allowed");
}
ManagedLedger ledger = ((PersistentTopic) topic).getManagedLedger();
return ledger.asyncFindPosition(entry -> {
final PersistentTopic persistentTopic = (PersistentTopic) topic;

return persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenCompose(lastEntry -> {
if (lastEntry == null) {
return findMessageIdByPublishTime(timestamp, persistentTopic.getManagedLedger());
}
MessageMetadata metadata;
Position position = lastEntry.getPosition();
try {
long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
} catch (Exception e) {
log.error("[{}] Error deserializing message for message position find", topicName, e);
metadata = Commands.parseMessageMetadata(lastEntry.getDataBuffer());
} finally {
entry.release();
lastEntry.release();
}
return false;
}).thenApply(position -> {
if (position == null) {
return null;
if (timestamp == metadata.getPublishTime()) {
return CompletableFuture.completedFuture(new MessageIdImpl(position.getLedgerId(),
position.getEntryId(), topicName.getPartitionIndex()));
} else if (timestamp < metadata.getPublishTime()) {
return persistentTopic.getTopicCompactionService().findEntryByPublishTime(timestamp)
.thenApply(compactedEntry -> {
try {
return new MessageIdImpl(compactedEntry.getLedgerId(),
compactedEntry.getEntryId(), topicName.getPartitionIndex());
} finally {
compactedEntry.release();
}
});
} else {
return new MessageIdImpl(position.getLedgerId(), position.getEntryId(),
topicName.getPartitionIndex());
return findMessageIdByPublishTime(timestamp, persistentTopic.getManagedLedger());
}
});
});
}

private CompletableFuture<MessageId> findMessageIdByPublishTime(long timestamp, ManagedLedger managedLedger) {
return managedLedger.asyncFindPosition(entry -> {
try {
long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
} catch (Exception e) {
log.error("[{}] Error deserializing message for message position find",
topicName,
e);
} finally {
entry.release();
}
return false;
}).thenApply(position -> {
if (position == null) {
return null;
} else {
return new MessageIdImpl(position.getLedgerId(), position.getEntryId(),
topicName.getPartitionIndex());
}
});
}

protected CompletableFuture<Response> internalPeekNthMessageAsync(String subName, int messagePosition,
boolean authoritative) {
CompletableFuture<Void> ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
Expand Down Expand Up @@ -320,6 +321,55 @@ public CompletableFuture<Entry> readLastEntryOfCompactedLedger() {
});
}

CompletableFuture<Entry> findFirstMatchEntry(final Predicate<Entry> predicate) {
var compactedTopicContextFuture = this.getCompactedTopicContextFuture();

if (compactedTopicContextFuture == null) {
return CompletableFuture.completedFuture(null);
}
return compactedTopicContextFuture.thenCompose(compactedTopicContext -> {
LedgerHandle lh = compactedTopicContext.getLedger();
CompletableFuture<Long> promise = new CompletableFuture<>();
findFirstMatchIndexLoop(predicate, 0L, lh.getLastAddConfirmed(), promise, null, lh);
return promise.thenCompose(index -> {
if (index == null) {
return CompletableFuture.completedFuture(null);
}
return readEntries(lh, index, index).thenApply(entries -> entries.get(0));
});
});
}
private static void findFirstMatchIndexLoop(final Predicate<Entry> predicate,
final long start, final long end,
final CompletableFuture<Long> promise,
final Long lastMatchIndex,
final LedgerHandle lh) {
if (start > end) {
promise.complete(lastMatchIndex);
return;
}

long mid = (start + end) / 2;
readEntries(lh, mid, mid).thenAccept(entries -> {
Entry entry = entries.get(0);
final boolean isMatch;
try {
isMatch = predicate.test(entry);
} finally {
entry.release();
}

if (isMatch) {
findFirstMatchIndexLoop(predicate, start, mid - 1, promise, mid, lh);
} else {
findFirstMatchIndexLoop(predicate, mid + 1, end, promise, lastMatchIndex, lh);
}
}).exceptionally(ex -> {
promise.completeExceptionally(ex);
return null;
});
}

private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m) {
return ComparisonChain.start()
.compare(p.getLedgerId(), m.getLedgerId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.apache.pulsar.compaction.CompactedTopicImpl.COMPACT_LEDGER_EMPTY;
import static org.apache.pulsar.compaction.CompactedTopicImpl.NEWER_THAN_COMPACTED;
import static org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint;
import static org.apache.pulsar.compaction.CompactedTopicImpl.readEntries;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
Expand All @@ -33,7 +32,6 @@
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
Expand Down Expand Up @@ -116,7 +114,7 @@ public CompletableFuture<Entry> findEntryByPublishTime(long publishTime) {
final Predicate<Entry> predicate = entry -> {
return Commands.parseMessageMetadata(entry.getDataBuffer()).getPublishTime() >= publishTime;
};
return findFirstMatchEntry(predicate);
return compactedTopic.findFirstMatchEntry(predicate);
}

@Override
Expand All @@ -128,57 +126,7 @@ public CompletableFuture<Entry> findEntryByEntryIndex(long entryIndex) {
}
return brokerEntryMetadata.getIndex() >= entryIndex;
};
return findFirstMatchEntry(predicate);
}

private CompletableFuture<Entry> findFirstMatchEntry(final Predicate<Entry> predicate) {
var compactedTopicContextFuture = compactedTopic.getCompactedTopicContextFuture();

if (compactedTopicContextFuture == null) {
return CompletableFuture.completedFuture(null);
}
return compactedTopicContextFuture.thenCompose(compactedTopicContext -> {
LedgerHandle lh = compactedTopicContext.getLedger();
CompletableFuture<Long> promise = new CompletableFuture<>();
findFirstMatchIndexLoop(predicate, 0L, lh.getLastAddConfirmed(), promise, null, lh);
return promise.thenCompose(index -> {
if (index == null) {
return CompletableFuture.completedFuture(null);
}
return readEntries(lh, index, index).thenApply(entries -> entries.get(0));
});
});
}

private static void findFirstMatchIndexLoop(final Predicate<Entry> predicate,
final long start, final long end,
final CompletableFuture<Long> promise,
final Long lastMatchIndex,
final LedgerHandle lh) {
if (start > end) {
promise.complete(lastMatchIndex);
return;
}

long mid = (start + end) / 2;
readEntries(lh, mid, mid).thenAccept(entries -> {
Entry entry = entries.get(0);
final boolean isMatch;
try {
isMatch = predicate.test(entry);
} finally {
entry.release();
}

if (isMatch) {
findFirstMatchIndexLoop(predicate, start, mid - 1, promise, mid, lh);
} else {
findFirstMatchIndexLoop(predicate, mid + 1, end, promise, lastMatchIndex, lh);
}
}).exceptionally(ex -> {
promise.completeExceptionally(ex);
return null;
});
return compactedTopic.findFirstMatchEntry(predicate);
}

public CompactedTopicImpl getCompactedTopic() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.util.ArrayList;
Expand Down Expand Up @@ -65,6 +67,7 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
Expand All @@ -87,6 +90,7 @@
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
Expand Down Expand Up @@ -1459,6 +1463,69 @@ public void onSendAcknowledgement(Producer producer, Message message, MessageId
.compareTo(id2) > 0);
}

@Test
public void testGetMessageIdByTimestampWithCompaction() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test"));
final String topicName = "persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestampWithCompaction";
admin.topics().createNonPartitionedTopic(topicName);

Map<MessageId, Long> publishTimeMap = new ConcurrentHashMap<>();
@Cleanup
ProducerBase<byte[]> producer = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.intercept(new ProducerInterceptor() {
@Override
public void close() {

}

@Override
public boolean eligible(Message message) {
return true;
}

@Override
public Message beforeSend(Producer producer, Message message) {
return message;
}

@Override
public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId,
Throwable exception) {
publishTimeMap.put(message.getMessageId(), message.getPublishTime());
}
})
.create();

MessageId id1 = producer.newMessage().key("K1").value("test1".getBytes()).send();
MessageId id2 = producer.newMessage().key("K2").value("test2".getBytes()).send();

long publish1 = publishTimeMap.get(id1);
long publish2 = publishTimeMap.get(id2);
Assert.assertTrue(publish1 < publish2);

admin.topics().triggerCompaction(topicName);
Awaitility.await().untilAsserted(() ->
assertSame(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.SUCCESS));

admin.topics().unload(topicName);
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName, false);
assertEquals(internalStats.ledgers.size(), 1);
assertEquals(internalStats.ledgers.get(0).entries, 0);
});

Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 - 1), id1);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1), id1);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 + 1), id2);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish2), id2);
Assert.assertTrue(admin.topics().getMessageIdByTimestamp(topicName, publish2 + 1)
.compareTo(id2) > 0);
}

@Test
public void testGetBatchMessageIdByTimestamp() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
Expand Down

0 comments on commit b54c5aa

Please sign in to comment.