Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 2912][pulsar-admin] add get-message-by-id cmd into pulsar-admin #6331

Merged
merged 12 commits into from
Apr 22, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import static org.apache.pulsar.common.util.Codec.decode;

Expand All @@ -31,7 +38,6 @@
import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -52,11 +58,6 @@
import javax.ws.rs.core.StreamingOutput;

import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
Expand Down Expand Up @@ -1403,21 +1404,51 @@ protected void internalResetCursorOnPosition(String subName, boolean authoritati
}
}

protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
protected Response internalGetMessageById(long ledgerId, long entryId, boolean authoritative) {
verifyReadOperation(authoritative);

PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
Entry entry = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is needed any more.

try {
CompletableFuture<Entry> future = new CompletableFuture<>();
ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}

@Override
public void readEntryComplete(Entry entry, Object ctx) {
future.complete(entry);
}
}, null);

entry = future.get(1000, TimeUnit.MILLISECONDS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for being late in reviewing this pull request. I would suggest implementing this using AsyncResponse. We have tried to move away from using sync methods.

You can check internalCreateSubscription on how to use AsyncResponse.

protected void internalCreateSubscription(AsyncResponse asyncResponse, String subscriptionName,

return generateResponseWithEntry(entry);
} catch (NullPointerException npe) {
throw new RestException(Status.NOT_FOUND, "Message not found");
} catch (Exception exception) {
log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}",
clientAppId(), ledgerId, entryId, topicName, exception);
throw new RestException(exception);
} finally {
if (entry != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this finally block to line 1520

entry.release();
}
}
}

protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
verifyReadOperation(authoritative);
validateAdminAccessForSubscriber(subName, authoritative);
if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), topicName,
subName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Skip messages on a non-persistent topic is not allowed");
}

PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
PersistentReplicator repl = null;
PersistentSubscription sub = null;
Expand All @@ -1433,48 +1464,7 @@ protected Response internalPeekNthMessage(String subName, int messagePosition, b
} else {
entry = sub.peekNthMessage(messagePosition).get();
}
checkNotNull(entry);
PositionImpl pos = (PositionImpl) entry.getPosition();
ByteBuf metadataAndPayload = entry.getDataBuffer();

// moves the readerIndex to the payload
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);

ResponseBuilder responseBuilder = Response.ok();
responseBuilder.header("X-Pulsar-Message-ID", pos.toString());
for (KeyValue keyValue : metadata.getPropertiesList()) {
responseBuilder.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), keyValue.getValue());
}
if (metadata.hasPublishTime()) {
responseBuilder.header("X-Pulsar-publish-time", DateFormatter.format(metadata.getPublishTime()));
}
if (metadata.hasEventTime()) {
responseBuilder.header("X-Pulsar-event-time", DateFormatter.format(metadata.getEventTime()));
}
if (metadata.hasNumMessagesInBatch()) {
responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch());
}

// Decode if needed
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize());

// Copy into a heap buffer for output stream compatibility
ByteBuf data = PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(),
uncompressedPayload.readableBytes());
data.writeBytes(uncompressedPayload);
uncompressedPayload.release();

StreamingOutput stream = new StreamingOutput() {

@Override
public void write(OutputStream output) throws IOException, WebApplicationException {
output.write(data.array(), data.arrayOffset(), data.readableBytes());
data.release();
}
};

return responseBuilder.entity(stream).build();
return generateResponseWithEntry(entry);
} catch (NullPointerException npe) {
throw new RestException(Status.NOT_FOUND, "Message not found");
} catch (Exception exception) {
Expand All @@ -1488,6 +1478,57 @@ public void write(OutputStream output) throws IOException, WebApplicationExcepti
}
}

private void verifyReadOperation(boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
}
}

private Response generateResponseWithEntry(Entry entry) throws IOException {
checkNotNull(entry);
PositionImpl pos = (PositionImpl) entry.getPosition();
ByteBuf metadataAndPayload = entry.getDataBuffer();

// moves the readerIndex to the payload
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);

ResponseBuilder responseBuilder = Response.ok();
responseBuilder.header("X-Pulsar-Message-ID", pos.toString());
for (KeyValue keyValue : metadata.getPropertiesList()) {
responseBuilder.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), keyValue.getValue());
}
if (metadata.hasPublishTime()) {
responseBuilder.header("X-Pulsar-publish-time", DateFormatter.format(metadata.getPublishTime()));
}
if (metadata.hasEventTime()) {
responseBuilder.header("X-Pulsar-event-time", DateFormatter.format(metadata.getEventTime()));
}
if (metadata.hasNumMessagesInBatch()) {
responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch());
}

// Decode if needed
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize());

// Copy into a heap buffer for output stream compatibility
ByteBuf data = PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(),
uncompressedPayload.readableBytes());
data.writeBytes(uncompressedPayload);
uncompressedPayload.release();

StreamingOutput stream = output -> {
output.write(data.array(), data.arrayOffset(), data.readableBytes());
data.release();
};

return responseBuilder.entity(stream).build();
}

protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,22 @@ public Response peekNthMessage(@PathParam("property") String property, @PathPara
return internalPeekNthMessage(decode(encodedSubName), messagePosition, authoritative);
}

@GET
@Path("/{property}/{cluster}/{namespace}/{topic}/ledger/{ledgerId}/entry/{entryId}")
@ApiOperation(hidden = true, value = "Get message by its messageId.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't java admin permission"),
@ApiResponse(code = 404, message = "Topic, subscription or the messageId does not exist")
})
public Response getMessageByID(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@PathParam("ledgerId") Long ledgerId, @PathParam("entryId") Long entryId,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
return internalGetMessageById(ledgerId, entryId, authoritative);
}

@GET
@Path("{property}/{cluster}/{namespace}/{topic}/backlog")
@ApiOperation(hidden = true, value = "Get estimated backlog for offline topic.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,36 @@ public Response peekNthMessage(
return internalPeekNthMessage(decode(encodedSubName), messagePosition, authoritative);
}

@GET
@Path("/{tenant}/{namespace}/{topic}/ledger/{ledgerId}/entry/{entryId}")
@ApiOperation(value = "Get message by its messageId.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" +
"subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic, subscription or the message position does not exist"),
@ApiResponse(code = 405, message = "Skipping messages on a non-persistent topic is not allowed"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
public Response getMessageById(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The ledger id", required = true)
@PathParam("ledgerId") long ledgerId,
@ApiParam(value = "The entry id", required = true)
@PathParam("entryId") long entryId,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
return internalGetMessageById(ledgerId, entryId, authoritative);
}

@GET
@Path("{tenant}/{namespace}/{topic}/backlog")
@ApiOperation(value = "Get estimated backlog for offline topic.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentFindCursorPositionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,32 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds)
*/
CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(String topic, String subName, int numMessages);

/**
* Get a message by its messageId via a topic subscription
* @param topic
* Topic name
* @param ledgerId
* Ledger id
* @param entryId
* Entry id
* @return the message indexed by the messageId
* @throws PulsarAdminException
* Unexpected error
*/
Message<byte[]> getMessageById(String topic, long ledgerId, long entryId) throws PulsarAdminException;

/**
* Get a message by its messageId via a topic subscription asynchronously
* @param topic
* Topic name
* @param ledgerId
* Ledger id
* @param entryId
* Entry id
* @return a future that can be used to track when the message is returned
*/
CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long ledgerId, long entryId);

/**
* Create a new subscription on a topic
*
Expand Down
Loading