Skip to content

Commit

Permalink
[feat][admin] PIP-330: getMessagesById gets all messages
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece committed Jan 18, 2024
1 parent 24c927e commit b670d13
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,33 @@
package org.apache.pulsar.broker.admin;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -140,4 +152,55 @@ public void testGetStats(String topic) throws Exception {
}
assertTrue(stats.getSubscriptions().containsKey(subscriptionName));
}

@Test
public void testGetMessagesId() throws PulsarClientException, ExecutionException, InterruptedException {
String topic = newTopicName();

int numMessages = 10;
int batchingMaxMessages = numMessages / 2;

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(true)
.batchingMaxMessages(batchingMaxMessages)
.batchingMaxPublishDelay(60, TimeUnit.SECONDS)
.create();

List<CompletableFuture<MessageId>> futures = new ArrayList<>();
for (int i = 0; i < numMessages; i++) {
futures.add(producer.sendAsync(("msg-" + i).getBytes(UTF_8)));
}
FutureUtil.waitForAll(futures).get();

Map<MessageIdImpl, Integer> messageIdMap = new HashMap<>();
futures.forEach(n -> {
try {
MessageId messageId = n.get();
if (messageId instanceof MessageIdImpl impl) {
MessageIdImpl key = new MessageIdImpl(impl.getLedgerId(), impl.getEntryId(), -1);
Integer i = messageIdMap.computeIfAbsent(key, __ -> 0);
messageIdMap.put(key, i + 1);
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

messageIdMap.forEach((key, value) -> {
assertEquals(value, batchingMaxMessages);
try {
List<Message<byte[]>> messages = admin.topics().getMessagesById(topic,
key.getLedgerId(), key.getEntryId());
assertNotNull(messages);
assertEquals(messages.size(), batchingMaxMessages);
} catch (PulsarAdminException e) {
throw new RuntimeException(e);
}
});

// The message id doesn't exist.
assertThrows(Exception.class, () -> admin.topics().getMessagesById(topic, 1024, 2048));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1679,7 +1679,9 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds)
* @return the message indexed by the messageId
* @throws PulsarAdminException
* Unexpected error
* @deprecated Using {@link #getMessagesById(String, long, long)} instead.
*/
@Deprecated
Message<byte[]> getMessageById(String topic, long ledgerId, long entryId) throws PulsarAdminException;

/**
Expand All @@ -1691,9 +1693,32 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds)
* @param entryId
* Entry id
* @return a future that can be used to track when the message is returned
* @deprecated Using {@link #getMessagesByIdAsync(String, long, long)} instead.
*/
@Deprecated
CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long ledgerId, long entryId);

/**
* Get the messages by messageId.
*
* @param topic Topic name
* @param ledgerId Ledger id
* @param entryId Entry id
* @return A set of messages.
* @throws PulsarAdminException Unexpected error
*/
List<Message<byte[]>> getMessagesById(String topic, long ledgerId, long entryId) throws PulsarAdminException;

/**
* Get the messages by messageId asynchronously.
*
* @param topic Topic name
* @param ledgerId Ledger id
* @param entryId Entry id
* @return A future that can be used to track when a set of messages is returned.
*/
CompletableFuture<List<Message<byte[]>>> getMessagesByIdAsync(String topic, long ledgerId, long entryId);

/**
* Get message ID published at or just after this absolute timestamp (in ms).
* @param topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -985,35 +985,35 @@ public CompletableFuture<Void> truncateAsync(String topic) {
return asyncDeleteRequest(path);
}

@Deprecated
@Override
public CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long ledgerId, long entryId) {
CompletableFuture<Message<byte[]>> future = new CompletableFuture<>();
getRemoteMessageById(topic, ledgerId, entryId).handle((r, ex) -> {
if (ex != null) {
if (ex instanceof NotFoundException) {
log.warn("Exception '{}' occurred while trying to get message.", ex.getMessage());
future.complete(r);
} else {
future.completeExceptionally(ex);
}
return getMessagesByIdAsync(topic, ledgerId, entryId).thenApply(n -> {
if (n == null || n.isEmpty()) {
return null;
}
future.complete(r);
return null;
return n.get(0);
});
return future;
}

private CompletableFuture<Message<byte[]>> getRemoteMessageById(String topic, long ledgerId, long entryId) {
@Deprecated
@Override
public Message<byte[]> getMessageById(String topic, long ledgerId, long entryId)
throws PulsarAdminException {
return sync(() -> getMessageByIdAsync(topic, ledgerId, entryId));
}

@Override
public CompletableFuture<List<Message<byte[]>>> getMessagesByIdAsync(String topic, long ledgerId, long entryId) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "ledger", Long.toString(ledgerId), "entry", Long.toString(entryId));
final CompletableFuture<Message<byte[]>> future = new CompletableFuture<>();
final CompletableFuture<List<Message<byte[]>>> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
try {
future.complete(getMessagesFromHttpResponse(topicName.toString(), response).get(0));
future.complete(getMessagesFromHttpResponse(topicName.toString(), response));
} catch (Exception e) {
future.completeExceptionally(getApiException(e));
}
Expand All @@ -1028,9 +1028,9 @@ public void failed(Throwable throwable) {
}

@Override
public Message<byte[]> getMessageById(String topic, long ledgerId, long entryId)
public List<Message<byte[]>> getMessagesById(String topic, long ledgerId, long entryId)
throws PulsarAdminException {
return sync(() -> getMessageByIdAsync(topic, ledgerId, entryId));
return sync(() -> getMessagesByIdAsync(topic, ledgerId, entryId));
}

@Override
Expand Down

0 comments on commit b670d13

Please sign in to comment.