From 25df99e290fef06af8bb1bd5211e1787cf1a6d19 Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Fri, 14 Feb 2020 16:45:44 -0800 Subject: [PATCH 1/8] add get-message-by-id cmd into pulsar-admin --- .../bookkeeper/mledger/ManagedCursor.java | 16 ++ .../mledger/impl/ManagedCursorImpl.java | 11 ++ .../impl/ManagedCursorContainerTest.java | 5 + .../admin/impl/PersistentTopicsBase.java | 148 +++++++++++------- .../broker/admin/v1/PersistentTopics.java | 16 ++ .../broker/admin/v2/PersistentTopics.java | 32 ++++ .../pulsar/broker/service/Subscription.java | 3 + .../NonPersistentSubscription.java | 6 + .../persistent/PersistentReplicator.java | 22 +++ .../persistent/PersistentSubscription.java | 24 ++- .../apache/pulsar/client/admin/Topics.java | 30 ++++ .../client/admin/internal/TopicsImpl.java | 72 ++++++++- .../apache/pulsar/admin/cli/CliCommand.java | 2 +- .../pulsar/admin/cli/CmdPersistentTopics.java | 31 ++++ .../apache/pulsar/admin/cli/CmdTopics.java | 31 ++++ 15 files changed, 385 insertions(+), 64 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index ae0426935637c..5a0ea45524e44 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -131,6 +131,22 @@ Entry getNthEntry(int n, IndividualDeletedEntries deletedEntries) void asyncGetNthEntry(int n, IndividualDeletedEntries deletedEntries, ReadEntryCallback callback, Object ctx); + + /** + * Asynchronously get a message by its id. + * + * @param ledgerId + * ledger id + * @param entryId + * entry id + * @param deletedEntries + * skip individual deleted entries + * @param callback + * @param ctx + */ + void asyncGetMessageById(long ledgerId, long entryId, IndividualDeletedEntries deletedEntries, + ReadEntryCallback callback, Object ctx); + /** * Read entries from the ManagedLedger, up to the specified number. The returned list can be smaller. * diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 158a2ce889a81..af3b34871c26c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -550,6 +550,17 @@ public void asyncGetNthEntry(int n, IndividualDeletedEntries deletedEntries, Rea } } + @Override + public void asyncGetMessageById(long ledgerId, long entryId, IndividualDeletedEntries deletedEntries, + ReadEntryCallback callback, Object ctx) { + if (isClosed()) { + callback.readEntryFailed(new ManagedLedgerException("Cursor was already closed"), ctx); + return; + } + + ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), callback, ctx); + } + @Override public List readEntriesOrWait(int numberOfEntriesToRead) throws InterruptedException, ManagedLedgerException { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index 21f5747919b2f..9e0f9eb6cad0d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -274,6 +274,11 @@ public void asyncGetNthEntry(int N, IndividualDeletedEntries deletedEntries, Rea Object ctx) { } + @Override + public void asyncGetMessageById(long ledgerId, long entryId, IndividualDeletedEntries deletedEntries, + ReadEntryCallback callback, Object ctx) { + } + @Override public void setActive() { } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 0add56e311cc4..3a9da51c5b3c6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -31,7 +31,6 @@ import io.netty.buffer.ByteBuf; import java.io.IOException; -import java.io.OutputStream; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -1403,21 +1402,41 @@ protected void internalResetCursorOnPosition(String subName, boolean authoritati } } - protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) { - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); - if (partitionMetadata.partitions > 0) { - throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed"); + protected Response internalGetMessageById(String subName, long ledgerId, long entryId, boolean authoritative) { + verifyReadOperation(subName, authoritative); + + PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); + PersistentReplicator replicator = null; + PersistentSubscription subscription = null; + Entry entry = null; + if (subName.startsWith(topic.getReplicatorPrefix())) { + replicator = getReplicatorReference(subName, topic); + } else { + subscription = (PersistentSubscription) getSubscriptionReference(subName, topic); } - validateAdminAccessForSubscriber(subName, authoritative); - if (!(getTopicReference(topicName) instanceof PersistentTopic)) { - log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), topicName, - subName); - throw new RestException(Status.METHOD_NOT_ALLOWED, - "Skip messages on a non-persistent topic is not allowed"); + try { + if (subName.startsWith(topic.getReplicatorPrefix())) { + entry = replicator.getMessageById(ledgerId, entryId).get(); + } else { + entry = subscription.getMessageById(ledgerId, entryId).get(); + } + return generateResponseWithEntry(entry); + } catch (NullPointerException npe) { + throw new RestException(Status.NOT_FOUND, "Message not found"); + } catch (Exception exception) { + log.error("[{}] Failed to get message with ledgerId {} entryId {} from {} {}", + clientAppId(), ledgerId, entryId, topicName, subName, exception); + throw new RestException(exception); + } finally { + if (entry != null) { + entry.release(); + } } + } + + protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) { + verifyReadOperation(subName, authoritative); + PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); PersistentReplicator repl = null; PersistentSubscription sub = null; @@ -1433,48 +1452,7 @@ protected Response internalPeekNthMessage(String subName, int messagePosition, b } else { entry = sub.peekNthMessage(messagePosition).get(); } - checkNotNull(entry); - PositionImpl pos = (PositionImpl) entry.getPosition(); - ByteBuf metadataAndPayload = entry.getDataBuffer(); - - // moves the readerIndex to the payload - MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); - - ResponseBuilder responseBuilder = Response.ok(); - responseBuilder.header("X-Pulsar-Message-ID", pos.toString()); - for (KeyValue keyValue : metadata.getPropertiesList()) { - responseBuilder.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), keyValue.getValue()); - } - if (metadata.hasPublishTime()) { - responseBuilder.header("X-Pulsar-publish-time", DateFormatter.format(metadata.getPublishTime())); - } - if (metadata.hasEventTime()) { - responseBuilder.header("X-Pulsar-event-time", DateFormatter.format(metadata.getEventTime())); - } - if (metadata.hasNumMessagesInBatch()) { - responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch()); - } - - // Decode if needed - CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression()); - ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize()); - - // Copy into a heap buffer for output stream compatibility - ByteBuf data = PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(), - uncompressedPayload.readableBytes()); - data.writeBytes(uncompressedPayload); - uncompressedPayload.release(); - - StreamingOutput stream = new StreamingOutput() { - - @Override - public void write(OutputStream output) throws IOException, WebApplicationException { - output.write(data.array(), data.arrayOffset(), data.readableBytes()); - data.release(); - } - }; - - return responseBuilder.entity(stream).build(); + return generateResponseWithEntry(entry); } catch (NullPointerException npe) { throw new RestException(Status.NOT_FOUND, "Message not found"); } catch (Exception exception) { @@ -1488,6 +1466,64 @@ public void write(OutputStream output) throws IOException, WebApplicationExcepti } } + private void verifyReadOperation(String subName, boolean authoritative) { + if (topicName.isGlobal()) { + validateGlobalNamespaceOwnership(namespaceName); + } + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); + if (partitionMetadata.partitions > 0) { + throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed"); + } + validateAdminAccessForSubscriber(subName, authoritative); + if (!(getTopicReference(topicName) instanceof PersistentTopic)) { + log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), topicName, + subName); + throw new RestException(Status.METHOD_NOT_ALLOWED, + "Skip messages on a non-persistent topic is not allowed"); + } + } + + private Response generateResponseWithEntry(Entry entry) throws IOException { + checkNotNull(entry); + PositionImpl pos = (PositionImpl) entry.getPosition(); + ByteBuf metadataAndPayload = entry.getDataBuffer(); + + // moves the readerIndex to the payload + MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); + + ResponseBuilder responseBuilder = Response.ok(); + responseBuilder.header("X-Pulsar-Message-ID", pos.toString()); + for (KeyValue keyValue : metadata.getPropertiesList()) { + responseBuilder.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), keyValue.getValue()); + } + if (metadata.hasPublishTime()) { + responseBuilder.header("X-Pulsar-publish-time", DateFormatter.format(metadata.getPublishTime())); + } + if (metadata.hasEventTime()) { + responseBuilder.header("X-Pulsar-event-time", DateFormatter.format(metadata.getEventTime())); + } + if (metadata.hasNumMessagesInBatch()) { + responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch()); + } + + // Decode if needed + CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression()); + ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize()); + + // Copy into a heap buffer for output stream compatibility + ByteBuf data = PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(), + uncompressedPayload.readableBytes()); + data.writeBytes(uncompressedPayload); + uncompressedPayload.release(); + + StreamingOutput stream = output -> { + output.write(data.array(), data.arrayOffset(), data.readableBytes()); + data.release(); + }; + + return responseBuilder.entity(stream).build(); + } + protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative) { if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 203555c66a6a6..bb5d6501d7bba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -537,6 +537,22 @@ public Response peekNthMessage(@PathParam("property") String property, @PathPara return internalPeekNthMessage(decode(encodedSubName), messagePosition, authoritative); } + @GET + @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/ledger/{ledgerId}/entry/{entryId}") + @ApiOperation(hidden = true, value = "Get message by its messageId.") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 403, message = "Don't java admin permission"), + @ApiResponse(code = 404, message = "Topic, subscription or the messageId does not exist") + }) + public Response getMessageByID(@PathParam("property") String property, @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + @PathParam("subName") String encodedSubName, @PathParam("ledgerId") Long ledgerId, + @PathParam("entryId") Long entryId, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(property, cluster, namespace, encodedTopic); + return internalGetMessageById(decode(encodedSubName), ledgerId, entryId, authoritative); + } + @GET @Path("{property}/{cluster}/{namespace}/{topic}/backlog") @ApiOperation(hidden = true, value = "Get estimated backlog for offline topic.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index d47d8b587b00c..98cb6a3605e0c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -884,6 +884,38 @@ public Response peekNthMessage( return internalPeekNthMessage(decode(encodedSubName), messagePosition, authoritative); } + @GET + @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/ledger/{ledgerId}/entry/{entryId}") + @ApiOperation(value = "Get message by its messageId.") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" + + "subscriber is not authorized to access this operation"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic, subscription or the message position does not exist"), + @ApiResponse(code = 405, message = "Skipping messages on a non-persistent topic is not allowed"), + @ApiResponse(code = 412, message = "Topic name is not valid"), + @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")}) + public Response getMessageById( + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(name = "subName", value = "Subscribed message expired", required = true) + @PathParam("subName") String encodedSubName, + @ApiParam(value = "The ledger id", required = true) + @PathParam("ledgerId") long ledgerId, + @ApiParam(value = "The entry id", required = true) + @PathParam("entryId") long entryId, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + return internalGetMessageById(decode(encodedSubName), ledgerId, entryId, authoritative); + } + @GET @Path("{tenant}/{namespace}/{topic}/backlog") @ApiOperation(value = "Get estimated backlog for offline topic.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index 18c7c49ae37f5..e4c5a354b0f18 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -25,6 +25,7 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot; @@ -79,6 +80,8 @@ default long getNumberOfEntriesDelayed() { CompletableFuture peekNthMessage(int messagePosition); + CompletableFuture getMessageById(long ledgerId, long entryId); + void expireMessages(int messageTTLInSeconds); void redeliverUnacknowledgedMessages(Consumer consumer); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index 652042000b6ac..23a0d7e05464a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -249,6 +249,12 @@ public CompletableFuture peekNthMessage(int messagePosition) { return CompletableFuture.completedFuture(null);// TODO: throw exception } + @Override + public CompletableFuture getMessageById(long ledgerId, long entryId) { + // No-op + return CompletableFuture.completedFuture(null); // TODO: throw exception + } + @Override public long getNumberOfEntriesInBacklog() { // No-op diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 2bc18ca384709..db355a8b6e306 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -583,6 +583,28 @@ public void readEntryComplete(Entry entry, Object ctx) { return future; } + public CompletableFuture getMessageById(long ledgerId, long entryId) { + CompletableFuture future = new CompletableFuture<>(); + + if (log.isDebugEnabled()) { + log.debug("[{}][{} -> {}] Getting message at position {} {}", topicName, localCluster, remoteCluster, + ledgerId, entryId); + } + + cursor.asyncGetMessageById(ledgerId, entryId, IndividualDeletedEntries.Exclude, new ReadEntryCallback() { + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + + @Override + public void readEntryComplete(Entry entry, Object ctx) { + future.complete(entry); + } + }, null); + return future; + } + @Override public void deleteComplete(Object ctx) { if (log.isDebugEnabled()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 88d8e45c8b3cc..aeb22b631980d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -44,7 +44,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentFindCursorPositionException; import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException; import org.apache.bookkeeper.mledger.Position; -import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; @@ -775,6 +774,29 @@ public void readEntryComplete(Entry entry, Object ctx) { return future; } + @Override + public CompletableFuture getMessageById(long ledgerId, long entryId) { + CompletableFuture future = new CompletableFuture<>(); + + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Getting message for ledgerId {} entryId {}", topicName, subName, ledgerId, entryId); + } + + cursor.asyncGetMessageById(ledgerId, entryId, IndividualDeletedEntries.Exclude, new ReadEntryCallback() { + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + + @Override + public void readEntryComplete(Entry entry, Object ctx) { + future.complete(entry); + } + }, null); + + return future; + } + @Override public long getNumberOfEntriesInBacklog() { return cursor.getNumberOfEntriesInBacklog(); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index c5ec04120c022..252ad99fa5695 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -938,6 +938,36 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) */ CompletableFuture>> peekMessagesAsync(String topic, String subName, int numMessages); + /** + * Get a message by its messageId via a topic subscription + * @param topic + * Topic name + * @param subName + * Subscription name + * @param ledgerId + * Ledger id + * @param entryId + * Entry id + * @return the message indexed by the messageId + * @throws PulsarAdminException + * Unexpected error + */ + Message getMessageById(String topic, String subName, long ledgerId, long entryId) throws PulsarAdminException; + + /** + * Get a message by its messageId via a topic subscription asynchronously + * @param topic + * Topic name + * @param subName + * Subscription name + * @param ledgerId + * Ledger id + * @param entryId + * Entry id + * @return a future that can be used to track when the message is returned + */ + CompletableFuture> getMessageByIdAsync(String topic, String subName, long ledgerId, long entryId); + /** * Create a new subscription on a topic * diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 75c3d59a55d7c..fb0aab096a7bc 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -83,6 +83,8 @@ public class TopicsImpl extends BaseResource implements Topics { private final WebTarget adminTopics; private final WebTarget adminV2Topics; private final String BATCH_HEADER = "X-Pulsar-num-batch-message"; + private final String MESSAGE_ID = "X-Pulsar-Message-ID"; + private final String PUBLISH_TIME = "X-Pulsar-publish-time"; public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) { super(auth, readTimeoutMs); adminTopics = web.path("/admin"); @@ -741,7 +743,7 @@ private CompletableFuture>> peekNthMessage(String topic, St @Override public void completed(Response response) { try { - future.complete(getMessageFromHttpResponse(tn.toString(), response)); + future.complete(getMessagesFromHttpResponse(tn.toString(), response)); } catch (Exception e) { future.completeExceptionally(getApiException(e)); } @@ -778,7 +780,6 @@ public CompletableFuture>> peekMessagesAsync(String topic, return future; } - private void peekMessagesAsync(String topic, String subName, int numMessages, List> messages, CompletableFuture>> future, int nthMessage) { if (numMessages <= 0) { @@ -792,7 +793,7 @@ private void peekMessagesAsync(String topic, String subName, int numMessages, // if we get a not found exception, it means that the position for the message we are trying to get // does not exist. At this point, we can return the already found messages. if (ex instanceof NotFoundException) { - log.warn("Exception '{}' occured while trying to peek Messages.", ex.getMessage()); + log.warn("Exception '{}' occurred while trying to peek Messages.", ex.getMessage()); future.complete(messages); } else { future.completeExceptionally(ex); @@ -807,6 +808,65 @@ private void peekMessagesAsync(String topic, String subName, int numMessages, }); } + @Override + public CompletableFuture> getMessageByIdAsync(String topic, String subName, long ledgerId, long entryId) { + CompletableFuture> future = new CompletableFuture<>(); + getRemoteMessageById(topic, subName, ledgerId, entryId).handle((r, ex) -> { + if (ex != null) { + if (ex instanceof NotFoundException) { + log.warn("Exception '{}' occurred while trying to get message.", ex.getMessage()); + future.complete(r); + } else { + future.completeExceptionally(ex); + } + return null; + } + future.complete(r); + return null; + }); + return future; + } + + private CompletableFuture> getRemoteMessageById(String topic, String subName, + long ledgerId, long entryId) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "subscription", subName, + "ledger", Long.toString(ledgerId), "entry", Long.toString(entryId)); + final CompletableFuture> future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(Response response) { + try { + future.complete(getMessagesFromHttpResponse(topicName.toString(), response).get(0)); + } catch (Exception e) { + future.completeExceptionally(getApiException(e)); + } + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override + public Message getMessageById(String topic, String subName, long ledgerId, long entryId) + throws PulsarAdminException { + try { + return getMessageByIdAsync(topic, subName, ledgerId, entryId).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + @Override public void createSubscription(String topic, String subscriptionName, MessageId messageId) throws PulsarAdminException { @@ -971,20 +1031,20 @@ private TopicName validateTopic(String topic) { return TopicName.get(topic); } - private List> getMessageFromHttpResponse(String topic, Response response) throws Exception { + private List> getMessagesFromHttpResponse(String topic, Response response) throws Exception { if (response.getStatus() != Status.OK.getStatusCode()) { throw getApiException(response); } - String msgId = response.getHeaderString("X-Pulsar-Message-ID"); + String msgId = response.getHeaderString(MESSAGE_ID); try (InputStream stream = (InputStream) response.getEntity()) { byte[] data = new byte[stream.available()]; stream.read(data); Map properties = Maps.newTreeMap(); MultivaluedMap headers = response.getHeaders(); - Object tmp = headers.getFirst("X-Pulsar-publish-time"); + Object tmp = headers.getFirst(PUBLISH_TIME); if (tmp != null) { properties.put("publish-time", (String) tmp); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java index 5fd97bb1d7ec5..a86d345d7e439 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java @@ -109,7 +109,7 @@ static MessageId validateMessageIdString(String resetMessageIdStr) throws Pulsar return new MessageIdImpl(Long.parseLong(messageId[0]), Long.parseLong(messageId[1]), -1); } catch (Exception e) { throw new PulsarAdminException( - "Invalid reset-position (must be in format: ledgerId:entryId) value " + resetMessageIdStr); + "Invalid message id (must be in format: ledgerId:entryId) value " + resetMessageIdStr); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java index 9df331fba320a..2b2d42f096404 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java @@ -79,6 +79,7 @@ public CmdPersistentTopics(PulsarAdmin admin) { jcommander.addCommand("get-partitioned-topic-metadata", new GetPartitionedTopicMetadataCmd()); jcommander.addCommand("delete-partitioned-topic", new DeletePartitionedCmd()); jcommander.addCommand("peek-messages", new PeekMessages()); + jcommander.addCommand("get-message-by-id", new GetMessageById()); jcommander.addCommand("reset-cursor", new ResetCursor()); jcommander.addCommand("terminate", new Terminate()); jcommander.addCommand("compact", new Compact()); @@ -564,6 +565,36 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Get message by its ledgerId and entryId") + private class GetMessageById extends CliCommand { + @Parameter(description = "persistent://property/cluster/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = { "-s", + "--subscription" }, description = "Subscription to get messages from", required = true) + private String subName; + + @Parameter(names = { "-l", "--ledgerId" }, + description = "ledger id pointing to the desired ledger", + required = true) + private long ledgerId; + + @Parameter(names = { "-e", "--entryId" }, + description = "entry id pointing to the desired entry", + required = true) + private long entryId; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + + Message message = persistentTopics.getMessageById(persistentTopic, subName, ledgerId, entryId); + + ByteBuf date = Unpooled.wrappedBuffer(message.getData()); + System.out.println(ByteBufUtil.prettyHexDump(date)); + } + } + @Parameters(commandDescription = "Compact a topic") private class Compact extends CliCommand { @Parameter(description = "persistent://property/cluster/namespace/topic", required = true) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index af5aa02e7a2a3..a63cc9246a1c9 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -89,6 +89,7 @@ public CmdTopics(PulsarAdmin admin) { jcommander.addCommand("delete-partitioned-topic", new DeletePartitionedCmd()); jcommander.addCommand("peek-messages", new PeekMessages()); + jcommander.addCommand("get-message-by-id", new GetMessageById()); jcommander.addCommand("reset-cursor", new ResetCursor()); jcommander.addCommand("terminate", new Terminate()); jcommander.addCommand("compact", new Compact()); @@ -608,6 +609,36 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Get message by its ledgerId and entryId") + private class GetMessageById extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = { "-s", + "--subscription" }, description = "Subscription to get messages from", required = true) + private String subName; + + @Parameter(names = { "-l", "--ledgerId" }, + description = "ledger id pointing to the desired ledger", + required = true) + private long ledgerId; + + @Parameter(names = { "-e", "--entryId" }, + description = "entry id pointing to the desired entry", + required = true) + private long entryId; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + + Message message = topics.getMessageById(persistentTopic, subName, ledgerId, entryId); + + ByteBuf date = Unpooled.wrappedBuffer(message.getData()); + System.out.println(ByteBufUtil.prettyHexDump(date)); + } + } + @Parameters(commandDescription = "Compact a topic") private class Compact extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic", required = true) From 532a76defbb990fabecc037a3a567d322d269c22 Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Fri, 14 Feb 2020 22:40:27 -0800 Subject: [PATCH 2/8] remove subscription name parameter from request --- .../admin/impl/PersistentTopicsBase.java | 44 +++++++++---------- .../broker/admin/v1/PersistentTopics.java | 8 ++-- .../broker/admin/v2/PersistentTopics.java | 6 +-- .../apache/pulsar/client/admin/Topics.java | 8 +--- .../client/admin/internal/TopicsImpl.java | 14 +++--- .../pulsar/admin/cli/CmdPersistentTopics.java | 6 +-- .../apache/pulsar/admin/cli/CmdTopics.java | 6 +-- 7 files changed, 38 insertions(+), 54 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 3a9da51c5b3c6..480eed880bb99 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -37,6 +37,7 @@ import java.util.Optional; import java.util.Set; import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -1402,24 +1403,17 @@ protected void internalResetCursorOnPosition(String subName, boolean authoritati } } - protected Response internalGetMessageById(String subName, long ledgerId, long entryId, boolean authoritative) { - verifyReadOperation(subName, authoritative); + protected Response internalGetMessageById(long ledgerId, long entryId, boolean authoritative) { + verifyReadOperation(authoritative); + String subName = "get-message-id-" + UUID.randomUUID().toString(); PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); - PersistentReplicator replicator = null; - PersistentSubscription subscription = null; + Entry entry = null; - if (subName.startsWith(topic.getReplicatorPrefix())) { - replicator = getReplicatorReference(subName, topic); - } else { - subscription = (PersistentSubscription) getSubscriptionReference(subName, topic); - } try { - if (subName.startsWith(topic.getReplicatorPrefix())) { - entry = replicator.getMessageById(ledgerId, entryId).get(); - } else { + PersistentSubscription subscription = + (PersistentSubscription) topic.createSubscription(subName, InitialPosition.Earliest, false).get(); entry = subscription.getMessageById(ledgerId, entryId).get(); - } return generateResponseWithEntry(entry); } catch (NullPointerException npe) { throw new RestException(Status.NOT_FOUND, "Message not found"); @@ -1431,11 +1425,24 @@ protected Response internalGetMessageById(String subName, long ledgerId, long en if (entry != null) { entry.release(); } + + try { + topic.unsubscribe(subName).get(); + } catch (Exception exception) { + log.error("Failed to unsubscribe from topic {} with subscription name {}", topicName, subName); + } } } protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) { - verifyReadOperation(subName, authoritative); + verifyReadOperation(authoritative); + validateAdminAccessForSubscriber(subName, authoritative); + if (!(getTopicReference(topicName) instanceof PersistentTopic)) { + log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), topicName, + subName); + throw new RestException(Status.METHOD_NOT_ALLOWED, + "Skip messages on a non-persistent topic is not allowed"); + } PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); PersistentReplicator repl = null; @@ -1466,7 +1473,7 @@ protected Response internalPeekNthMessage(String subName, int messagePosition, b } } - private void verifyReadOperation(String subName, boolean authoritative) { + private void verifyReadOperation(boolean authoritative) { if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } @@ -1474,13 +1481,6 @@ private void verifyReadOperation(String subName, boolean authoritative) { if (partitionMetadata.partitions > 0) { throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed"); } - validateAdminAccessForSubscriber(subName, authoritative); - if (!(getTopicReference(topicName) instanceof PersistentTopic)) { - log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), topicName, - subName); - throw new RestException(Status.METHOD_NOT_ALLOWED, - "Skip messages on a non-persistent topic is not allowed"); - } } private Response generateResponseWithEntry(Entry entry) throws IOException { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index bb5d6501d7bba..cee9c66666d76 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -538,7 +538,7 @@ public Response peekNthMessage(@PathParam("property") String property, @PathPara } @GET - @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/ledger/{ledgerId}/entry/{entryId}") + @Path("/{property}/{cluster}/{namespace}/{topic}/ledger/{ledgerId}/entry/{entryId}") @ApiOperation(hidden = true, value = "Get message by its messageId.") @ApiResponses(value = { @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @@ -547,10 +547,10 @@ public Response peekNthMessage(@PathParam("property") String property, @PathPara }) public Response getMessageByID(@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, - @PathParam("subName") String encodedSubName, @PathParam("ledgerId") Long ledgerId, - @PathParam("entryId") Long entryId, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + @PathParam("ledgerId") Long ledgerId, @PathParam("entryId") Long entryId, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(property, cluster, namespace, encodedTopic); - return internalGetMessageById(decode(encodedSubName), ledgerId, entryId, authoritative); + return internalGetMessageById(ledgerId, entryId, authoritative); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 98cb6a3605e0c..c4bc135716ee3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -885,7 +885,7 @@ public Response peekNthMessage( } @GET - @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/ledger/{ledgerId}/entry/{entryId}") + @Path("/{tenant}/{namespace}/{topic}/ledger/{ledgerId}/entry/{entryId}") @ApiOperation(value = "Get message by its messageId.") @ApiResponses(value = { @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @@ -904,8 +904,6 @@ public Response getMessageById( @PathParam("namespace") String namespace, @ApiParam(value = "Specify topic name", required = true) @PathParam("topic") @Encoded String encodedTopic, - @ApiParam(name = "subName", value = "Subscribed message expired", required = true) - @PathParam("subName") String encodedSubName, @ApiParam(value = "The ledger id", required = true) @PathParam("ledgerId") long ledgerId, @ApiParam(value = "The entry id", required = true) @@ -913,7 +911,7 @@ public Response getMessageById( @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - return internalGetMessageById(decode(encodedSubName), ledgerId, entryId, authoritative); + return internalGetMessageById(ledgerId, entryId, authoritative); } @GET diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index 252ad99fa5695..56e6fad77ec47 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -942,8 +942,6 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) * Get a message by its messageId via a topic subscription * @param topic * Topic name - * @param subName - * Subscription name * @param ledgerId * Ledger id * @param entryId @@ -952,21 +950,19 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) * @throws PulsarAdminException * Unexpected error */ - Message getMessageById(String topic, String subName, long ledgerId, long entryId) throws PulsarAdminException; + Message getMessageById(String topic, long ledgerId, long entryId) throws PulsarAdminException; /** * Get a message by its messageId via a topic subscription asynchronously * @param topic * Topic name - * @param subName - * Subscription name * @param ledgerId * Ledger id * @param entryId * Entry id * @return a future that can be used to track when the message is returned */ - CompletableFuture> getMessageByIdAsync(String topic, String subName, long ledgerId, long entryId); + CompletableFuture> getMessageByIdAsync(String topic, long ledgerId, long entryId); /** * Create a new subscription on a topic diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index fb0aab096a7bc..0099deba5ebe0 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -809,9 +809,9 @@ private void peekMessagesAsync(String topic, String subName, int numMessages, } @Override - public CompletableFuture> getMessageByIdAsync(String topic, String subName, long ledgerId, long entryId) { + public CompletableFuture> getMessageByIdAsync(String topic, long ledgerId, long entryId) { CompletableFuture> future = new CompletableFuture<>(); - getRemoteMessageById(topic, subName, ledgerId, entryId).handle((r, ex) -> { + getRemoteMessageById(topic, ledgerId, entryId).handle((r, ex) -> { if (ex != null) { if (ex instanceof NotFoundException) { log.warn("Exception '{}' occurred while trying to get message.", ex.getMessage()); @@ -827,11 +827,9 @@ public CompletableFuture> getMessageByIdAsync(String topic, Stri return future; } - private CompletableFuture> getRemoteMessageById(String topic, String subName, - long ledgerId, long entryId) { + private CompletableFuture> getRemoteMessageById(String topic, long ledgerId, long entryId) { TopicName topicName = validateTopic(topic); - WebTarget path = topicPath(topicName, "subscription", subName, - "ledger", Long.toString(ledgerId), "entry", Long.toString(entryId)); + WebTarget path = topicPath(topicName, "ledger", Long.toString(ledgerId), "entry", Long.toString(entryId)); final CompletableFuture> future = new CompletableFuture<>(); asyncGetRequest(path, new InvocationCallback() { @@ -853,10 +851,10 @@ public void failed(Throwable throwable) { } @Override - public Message getMessageById(String topic, String subName, long ledgerId, long entryId) + public Message getMessageById(String topic, long ledgerId, long entryId) throws PulsarAdminException { try { - return getMessageByIdAsync(topic, subName, ledgerId, entryId).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + return getMessageByIdAsync(topic, ledgerId, entryId).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { throw (PulsarAdminException) e.getCause(); } catch (InterruptedException e) { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java index 2b2d42f096404..6c7b9b4af2c38 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java @@ -570,10 +570,6 @@ private class GetMessageById extends CliCommand { @Parameter(description = "persistent://property/cluster/namespace/topic", required = true) private java.util.List params; - @Parameter(names = { "-s", - "--subscription" }, description = "Subscription to get messages from", required = true) - private String subName; - @Parameter(names = { "-l", "--ledgerId" }, description = "ledger id pointing to the desired ledger", required = true) @@ -588,7 +584,7 @@ private class GetMessageById extends CliCommand { void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); - Message message = persistentTopics.getMessageById(persistentTopic, subName, ledgerId, entryId); + Message message = persistentTopics.getMessageById(persistentTopic, ledgerId, entryId); ByteBuf date = Unpooled.wrappedBuffer(message.getData()); System.out.println(ByteBufUtil.prettyHexDump(date)); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index a63cc9246a1c9..051def0df2f65 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -614,10 +614,6 @@ private class GetMessageById extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic", required = true) private java.util.List params; - @Parameter(names = { "-s", - "--subscription" }, description = "Subscription to get messages from", required = true) - private String subName; - @Parameter(names = { "-l", "--ledgerId" }, description = "ledger id pointing to the desired ledger", required = true) @@ -632,7 +628,7 @@ private class GetMessageById extends CliCommand { void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); - Message message = topics.getMessageById(persistentTopic, subName, ledgerId, entryId); + Message message = topics.getMessageById(persistentTopic, ledgerId, entryId); ByteBuf date = Unpooled.wrappedBuffer(message.getData()); System.out.println(ByteBufUtil.prettyHexDump(date)); From feced5036c8fcb2b743a898ca3d2152ccab6d1b4 Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Sat, 15 Feb 2020 11:55:14 -0800 Subject: [PATCH 3/8] use ledger to get entry --- .../admin/impl/PersistentTopicsBase.java | 43 +++++++++++-------- .../pulsar/broker/service/Subscription.java | 2 - .../NonPersistentSubscription.java | 6 --- .../persistent/PersistentReplicator.java | 22 ---------- .../persistent/PersistentSubscription.java | 23 ---------- 5 files changed, 24 insertions(+), 72 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 480eed880bb99..9083cfe778fab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -20,6 +20,13 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerInfo; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.common.api.proto.PulsarApi; import static org.apache.pulsar.common.util.Codec.decode; @@ -37,7 +44,6 @@ import java.util.Optional; import java.util.Set; import java.util.TreeMap; -import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -52,11 +58,6 @@ import javax.ws.rs.core.StreamingOutput; import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback; -import org.apache.bookkeeper.mledger.Entry; -import org.apache.bookkeeper.mledger.ManagedLedgerConfig; -import org.apache.bookkeeper.mledger.ManagedLedgerException; -import org.apache.bookkeeper.mledger.ManagedLedgerInfo; -import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -1406,31 +1407,35 @@ protected void internalResetCursorOnPosition(String subName, boolean authoritati protected Response internalGetMessageById(long ledgerId, long entryId, boolean authoritative) { verifyReadOperation(authoritative); - String subName = "get-message-id-" + UUID.randomUUID().toString(); PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); - + ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger(); Entry entry = null; try { - PersistentSubscription subscription = - (PersistentSubscription) topic.createSubscription(subName, InitialPosition.Earliest, false).get(); - entry = subscription.getMessageById(ledgerId, entryId).get(); + CompletableFuture future = new CompletableFuture<>(); + ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new AsyncCallbacks.ReadEntryCallback() { + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + + @Override + public void readEntryComplete(Entry entry, Object ctx) { + future.complete(entry); + } + }, null); + + entry = future.get(1000, TimeUnit.MILLISECONDS); return generateResponseWithEntry(entry); } catch (NullPointerException npe) { throw new RestException(Status.NOT_FOUND, "Message not found"); } catch (Exception exception) { - log.error("[{}] Failed to get message with ledgerId {} entryId {} from {} {}", - clientAppId(), ledgerId, entryId, topicName, subName, exception); + log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}", + clientAppId(), ledgerId, entryId, topicName, exception); throw new RestException(exception); } finally { if (entry != null) { entry.release(); } - - try { - topic.unsubscribe(subName).get(); - } catch (Exception exception) { - log.error("Failed to unsubscribe from topic {} with subscription name {}", topicName, subName); - } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index e4c5a354b0f18..5f356f97088e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -80,8 +80,6 @@ default long getNumberOfEntriesDelayed() { CompletableFuture peekNthMessage(int messagePosition); - CompletableFuture getMessageById(long ledgerId, long entryId); - void expireMessages(int messageTTLInSeconds); void redeliverUnacknowledgedMessages(Consumer consumer); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index 23a0d7e05464a..652042000b6ac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -249,12 +249,6 @@ public CompletableFuture peekNthMessage(int messagePosition) { return CompletableFuture.completedFuture(null);// TODO: throw exception } - @Override - public CompletableFuture getMessageById(long ledgerId, long entryId) { - // No-op - return CompletableFuture.completedFuture(null); // TODO: throw exception - } - @Override public long getNumberOfEntriesInBacklog() { // No-op diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index db355a8b6e306..2bc18ca384709 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -583,28 +583,6 @@ public void readEntryComplete(Entry entry, Object ctx) { return future; } - public CompletableFuture getMessageById(long ledgerId, long entryId) { - CompletableFuture future = new CompletableFuture<>(); - - if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Getting message at position {} {}", topicName, localCluster, remoteCluster, - ledgerId, entryId); - } - - cursor.asyncGetMessageById(ledgerId, entryId, IndividualDeletedEntries.Exclude, new ReadEntryCallback() { - @Override - public void readEntryFailed(ManagedLedgerException exception, Object ctx) { - future.completeExceptionally(exception); - } - - @Override - public void readEntryComplete(Entry entry, Object ctx) { - future.complete(entry); - } - }, null); - return future; - } - @Override public void deleteComplete(Object ctx) { if (log.isDebugEnabled()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index aeb22b631980d..9d8ef3a7f938a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -774,29 +774,6 @@ public void readEntryComplete(Entry entry, Object ctx) { return future; } - @Override - public CompletableFuture getMessageById(long ledgerId, long entryId) { - CompletableFuture future = new CompletableFuture<>(); - - if (log.isDebugEnabled()) { - log.debug("[{}][{}] Getting message for ledgerId {} entryId {}", topicName, subName, ledgerId, entryId); - } - - cursor.asyncGetMessageById(ledgerId, entryId, IndividualDeletedEntries.Exclude, new ReadEntryCallback() { - @Override - public void readEntryFailed(ManagedLedgerException exception, Object ctx) { - future.completeExceptionally(exception); - } - - @Override - public void readEntryComplete(Entry entry, Object ctx) { - future.complete(entry); - } - }, null); - - return future; - } - @Override public long getNumberOfEntriesInBacklog() { return cursor.getNumberOfEntriesInBacklog(); From a7ff07042b02ba426df06bc60d818c7d38399006 Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Sun, 16 Feb 2020 21:27:33 -0800 Subject: [PATCH 4/8] remove asyncGetMessageById --- .../apache/bookkeeper/mledger/ManagedCursor.java | 16 ---------------- .../mledger/impl/ManagedCursorImpl.java | 11 ----------- .../mledger/impl/ManagedCursorContainerTest.java | 5 ----- .../pulsar/broker/service/Subscription.java | 1 - 4 files changed, 33 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index 5a0ea45524e44..ae0426935637c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -131,22 +131,6 @@ Entry getNthEntry(int n, IndividualDeletedEntries deletedEntries) void asyncGetNthEntry(int n, IndividualDeletedEntries deletedEntries, ReadEntryCallback callback, Object ctx); - - /** - * Asynchronously get a message by its id. - * - * @param ledgerId - * ledger id - * @param entryId - * entry id - * @param deletedEntries - * skip individual deleted entries - * @param callback - * @param ctx - */ - void asyncGetMessageById(long ledgerId, long entryId, IndividualDeletedEntries deletedEntries, - ReadEntryCallback callback, Object ctx); - /** * Read entries from the ManagedLedger, up to the specified number. The returned list can be smaller. * diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index af3b34871c26c..158a2ce889a81 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -550,17 +550,6 @@ public void asyncGetNthEntry(int n, IndividualDeletedEntries deletedEntries, Rea } } - @Override - public void asyncGetMessageById(long ledgerId, long entryId, IndividualDeletedEntries deletedEntries, - ReadEntryCallback callback, Object ctx) { - if (isClosed()) { - callback.readEntryFailed(new ManagedLedgerException("Cursor was already closed"), ctx); - return; - } - - ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), callback, ctx); - } - @Override public List readEntriesOrWait(int numberOfEntriesToRead) throws InterruptedException, ManagedLedgerException { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index 9e0f9eb6cad0d..21f5747919b2f 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -274,11 +274,6 @@ public void asyncGetNthEntry(int N, IndividualDeletedEntries deletedEntries, Rea Object ctx) { } - @Override - public void asyncGetMessageById(long ledgerId, long entryId, IndividualDeletedEntries deletedEntries, - ReadEntryCallback callback, Object ctx) { - } - @Override public void setActive() { } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index 5f356f97088e9..18c7c49ae37f5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -25,7 +25,6 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; -import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot; From ec12604f2db0116bfd53ee621d4f8b8b348dbbdd Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Mon, 17 Feb 2020 17:24:18 -0800 Subject: [PATCH 5/8] use asyncResponse to generate response and add doc --- .../admin/impl/PersistentTopicsBase.java | 18 +++++++------- .../broker/admin/v1/PersistentTopics.java | 18 +++++++++----- .../broker/admin/v2/PersistentTopics.java | 13 +++++++--- site2/docs/admin-api-persistent-topics.md | 24 +++++++++++++++++++ 4 files changed, 56 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 9083cfe778fab..ac626144849c8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1404,7 +1404,8 @@ protected void internalResetCursorOnPosition(String subName, boolean authoritati } } - protected Response internalGetMessageById(long ledgerId, long entryId, boolean authoritative) { + protected void internalGetMessageById(AsyncResponse asyncResponse, long ledgerId, long entryId, + boolean authoritative) { verifyReadOperation(authoritative); PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); @@ -1415,23 +1416,24 @@ protected Response internalGetMessageById(long ledgerId, long entryId, boolean a ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryFailed(ManagedLedgerException exception, Object ctx) { - future.completeExceptionally(exception); + asyncResponse.resume(new RestException(exception)); } @Override public void readEntryComplete(Entry entry, Object ctx) { - future.complete(entry); + try { + asyncResponse.resume(generateResponseWithEntry(entry)); + } catch (IOException exception) { + asyncResponse.resume(new RestException(exception)); + } } }, null); - - entry = future.get(1000, TimeUnit.MILLISECONDS); - return generateResponseWithEntry(entry); } catch (NullPointerException npe) { - throw new RestException(Status.NOT_FOUND, "Message not found"); + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Message not found")); } catch (Exception exception) { log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}", clientAppId(), ledgerId, entryId, topicName, exception); - throw new RestException(exception); + asyncResponse.resume(new RestException(exception)); } finally { if (entry != null) { entry.release(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index cee9c66666d76..4699eddd8c993 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -545,12 +545,18 @@ public Response peekNthMessage(@PathParam("property") String property, @PathPara @ApiResponse(code = 403, message = "Don't java admin permission"), @ApiResponse(code = 404, message = "Topic, subscription or the messageId does not exist") }) - public Response getMessageByID(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, - @PathParam("ledgerId") Long ledgerId, @PathParam("entryId") Long entryId, - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateTopicName(property, cluster, namespace, encodedTopic); - return internalGetMessageById(ledgerId, entryId, authoritative); + public void getMessageByID(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, @PathParam("ledgerId") Long ledgerId, + @PathParam("entryId") Long entryId, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + try { + validateTopicName(property, cluster, namespace, encodedTopic); + internalGetMessageById(asyncResponse, ledgerId, entryId, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index c4bc135716ee3..ff2dc493616ae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -897,7 +897,8 @@ public Response peekNthMessage( @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")}) - public Response getMessageById( + public void getMessageById( + @Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Specify the tenant", required = true) @PathParam("tenant") String tenant, @ApiParam(value = "Specify the namespace", required = true) @@ -910,8 +911,14 @@ public Response getMessageById( @PathParam("entryId") long entryId, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - validateTopicName(tenant, namespace, encodedTopic); - return internalGetMessageById(ledgerId, entryId, authoritative); + try { + validateTopicName(tenant, namespace, encodedTopic); + internalGetMessageById(asyncResponse, ledgerId, entryId, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } } @GET diff --git a/site2/docs/admin-api-persistent-topics.md b/site2/docs/admin-api-persistent-topics.md index b09040d098016..340e7c760ec6f 100644 --- a/site2/docs/admin-api-persistent-topics.md +++ b/site2/docs/admin-api-persistent-topics.md @@ -474,6 +474,30 @@ int numMessages = 1; admin.persistentTopics().peekMessages(topic, subName, numMessages); ``` +### Get message by ID + +It fetches the message with given ledger id and entry id. + +#### pulsar-admin + +```shell +$ ./bin/pulsar-admin topics get-message-by-id \ + persistent://public/default/my-topic \ + -l 10 -e 0 +``` + +#### REST API +{@inject: endpoint|GET|/admin/v2/persistent/:tenant/:namespace/:topic/ledger/:ledgerId/entry/:entryId|operation/getMessageById} + +#### Java + +```java +String topic = "persistent://my-tenant/my-namespace/my-topic"; +long ledgerId = 10; +long entryId = 10; +admin.persistentTopics().getMessageById(topic, ledgerId, entryId); +``` + ### Skip messages It skips N messages for a specific subscription of a given topic. From 7f6d4b1dc7bac1df60306696f1f165f96f820bcd Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Mon, 17 Feb 2020 17:30:45 -0800 Subject: [PATCH 6/8] add doc to reference-pulsar-admin --- site2/docs/reference-pulsar-admin.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md index c52f87bad31f3..649c6ada0dda8 100644 --- a/site2/docs/reference-pulsar-admin.md +++ b/site2/docs/reference-pulsar-admin.md @@ -1714,6 +1714,7 @@ Subcommands * `expire-messages-all-subscriptions` * `peek-messages` * `reset-cursor` +* `get-message-by-id` ### `compact` @@ -2091,6 +2092,20 @@ Options |`-m`, `--messageId`| The messageId to reset back to (ledgerId:entryId). || +### `get-message-by-id` +Get message by ledger id and entry id + +Usage +```bash +$ pulsar-admin topics get-message-by-id topic options +``` + +Options +|Flag|Description|Default| +|---|---|---| +|`-l`, `--ledgerId`| The ledger id |0| +|`-e`, `--entryId`| The entry id |0| + ## `tenants` Operations for managing tenants From 9d1c1e63255a4873a76c019b18be0cd41d1f012b Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Mon, 17 Feb 2020 18:32:02 -0800 Subject: [PATCH 7/8] clean up method internalGetMessageById --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index ac626144849c8..8d3a3a5443e5d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1410,9 +1410,7 @@ protected void internalGetMessageById(AsyncResponse asyncResponse, long ledgerId PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger(); - Entry entry = null; try { - CompletableFuture future = new CompletableFuture<>(); ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryFailed(ManagedLedgerException exception, Object ctx) { @@ -1434,10 +1432,6 @@ public void readEntryComplete(Entry entry, Object ctx) { log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}", clientAppId(), ledgerId, entryId, topicName, exception); asyncResponse.resume(new RestException(exception)); - } finally { - if (entry != null) { - entry.release(); - } } } From b475e8fae562cc653c2c552cfba5269fe93fb652 Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Tue, 18 Feb 2020 09:36:30 -0800 Subject: [PATCH 8/8] resolve conflicts --- .../broker/admin/impl/PersistentTopicsBase.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index d58a6441faeb4..75b8813215694 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1515,6 +1515,10 @@ public void readEntryComplete(Entry entry, Object ctx) { asyncResponse.resume(generateResponseWithEntry(entry)); } catch (IOException exception) { asyncResponse.resume(new RestException(exception)); + } finally { + if (entry != null) { + entry.release(); + } } } }, null); @@ -1528,17 +1532,11 @@ public void readEntryComplete(Entry entry, Object ctx) { } protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) { - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } + verifyReadOperation(authoritative); // If the topic name is a partition name, no need to get partition topic metadata again if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) { throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed"); } - } - - protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) { - verifyReadOperation(authoritative); validateAdminAccessForSubscriber(subName, authoritative); if (!(getTopicReference(topicName) instanceof PersistentTopic)) { log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), topicName,