diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java index 93bf2349103c3b..9dba4ae34fea6e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java @@ -19,21 +19,33 @@ package org.apache.pulsar.broker.admin; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; +import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -140,4 +152,55 @@ public void testGetStats(String topic) throws Exception { } assertTrue(stats.getSubscriptions().containsKey(subscriptionName)); } + + @Test + public void testGetMessagesId() throws PulsarClientException, ExecutionException, InterruptedException { + String topic = newTopicName(); + + int numMessages = 10; + int batchingMaxMessages = numMessages / 2; + + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(true) + .batchingMaxMessages(batchingMaxMessages) + .batchingMaxPublishDelay(60, TimeUnit.SECONDS) + .create(); + + List> futures = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + futures.add(producer.sendAsync(("msg-" + i).getBytes(UTF_8))); + } + FutureUtil.waitForAll(futures).get(); + + Map messageIdMap = new HashMap<>(); + futures.forEach(n -> { + try { + MessageId messageId = n.get(); + if (messageId instanceof MessageIdImpl impl) { + MessageIdImpl key = new MessageIdImpl(impl.getLedgerId(), impl.getEntryId(), -1); + Integer i = messageIdMap.computeIfAbsent(key, __ -> 0); + messageIdMap.put(key, i + 1); + } + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + + messageIdMap.forEach((key, value) -> { + assertEquals(value, batchingMaxMessages); + try { + List> messages = admin.topics().getMessagesById(topic, + key.getLedgerId(), key.getEntryId()); + assertNotNull(messages); + assertEquals(messages.size(), batchingMaxMessages); + } catch (PulsarAdminException e) { + throw new RuntimeException(e); + } + }); + + // The message id doesn't exist. + assertThrows(Exception.class, () -> admin.topics().getMessagesById(topic, 1024, 2048)); + } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index cace5cda7bd5b5..574b859e82c801 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1679,7 +1679,9 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) * @return the message indexed by the messageId * @throws PulsarAdminException * Unexpected error + * @deprecated Using {@link #getMessagesById(String, long, long)} instead. */ + @Deprecated Message getMessageById(String topic, long ledgerId, long entryId) throws PulsarAdminException; /** @@ -1691,9 +1693,32 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) * @param entryId * Entry id * @return a future that can be used to track when the message is returned + * @deprecated Using {@link #getMessagesByIdAsync(String, long, long)} instead. */ + @Deprecated CompletableFuture> getMessageByIdAsync(String topic, long ledgerId, long entryId); + /** + * Get the messages by messageId. + * + * @param topic Topic name + * @param ledgerId Ledger id + * @param entryId Entry id + * @return A set of messages. + * @throws PulsarAdminException Unexpected error + */ + List> getMessagesById(String topic, long ledgerId, long entryId) throws PulsarAdminException; + + /** + * Get the messages by messageId asynchronously. + * + * @param topic Topic name + * @param ledgerId Ledger id + * @param entryId Entry id + * @return A future that can be used to track when a set of messages is returned. + */ + CompletableFuture>> getMessagesByIdAsync(String topic, long ledgerId, long entryId); + /** * Get message ID published at or just after this absolute timestamp (in ms). * @param topic diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 9d09d96073d9e9..9d990fc09fc4bc 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -985,35 +985,35 @@ public CompletableFuture truncateAsync(String topic) { return asyncDeleteRequest(path); } + @Deprecated @Override public CompletableFuture> getMessageByIdAsync(String topic, long ledgerId, long entryId) { - CompletableFuture> future = new CompletableFuture<>(); - getRemoteMessageById(topic, ledgerId, entryId).handle((r, ex) -> { - if (ex != null) { - if (ex instanceof NotFoundException) { - log.warn("Exception '{}' occurred while trying to get message.", ex.getMessage()); - future.complete(r); - } else { - future.completeExceptionally(ex); - } + return getMessagesByIdAsync(topic, ledgerId, entryId).thenApply(n -> { + if (n == null || n.isEmpty()) { return null; } - future.complete(r); - return null; + return n.get(0); }); - return future; } - private CompletableFuture> getRemoteMessageById(String topic, long ledgerId, long entryId) { + @Deprecated + @Override + public Message getMessageById(String topic, long ledgerId, long entryId) + throws PulsarAdminException { + return sync(() -> getMessageByIdAsync(topic, ledgerId, entryId)); + } + + @Override + public CompletableFuture>> getMessagesByIdAsync(String topic, long ledgerId, long entryId) { TopicName topicName = validateTopic(topic); WebTarget path = topicPath(topicName, "ledger", Long.toString(ledgerId), "entry", Long.toString(entryId)); - final CompletableFuture> future = new CompletableFuture<>(); + final CompletableFuture>> future = new CompletableFuture<>(); asyncGetRequest(path, new InvocationCallback() { @Override public void completed(Response response) { try { - future.complete(getMessagesFromHttpResponse(topicName.toString(), response).get(0)); + future.complete(getMessagesFromHttpResponse(topicName.toString(), response)); } catch (Exception e) { future.completeExceptionally(getApiException(e)); } @@ -1028,9 +1028,9 @@ public void failed(Throwable throwable) { } @Override - public Message getMessageById(String topic, long ledgerId, long entryId) + public List> getMessagesById(String topic, long ledgerId, long entryId) throws PulsarAdminException { - return sync(() -> getMessageByIdAsync(topic, ledgerId, entryId)); + return sync(() -> getMessagesByIdAsync(topic, ledgerId, entryId)); } @Override