From 6d736bb2b37658a9419e01a6ca2b414681069213 Mon Sep 17 00:00:00 2001 From: technoboy Date: Fri, 21 Jan 2022 11:32:54 +0800 Subject: [PATCH 1/3] Fix call sync method in async rest api for internalGetMessageById. --- .../admin/impl/PersistentTopicsBase.java | 81 +++++++++++-------- 1 file changed, 47 insertions(+), 34 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index c009a16b1cfd2..bb22dff354b3a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2383,42 +2383,55 @@ private PositionImpl calculatePositionAckSet(boolean isExcluded, int batchSize, protected void internalGetMessageById(AsyncResponse asyncResponse, long ledgerId, long entryId, boolean authoritative) { - try { - // will redirect if the topic not owned by current broker - validateTopicOwnership(topicName, authoritative); - validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES); - - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); - ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger(); - ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new AsyncCallbacks.ReadEntryCallback() { - @Override - public void readEntryFailed(ManagedLedgerException exception, Object ctx) { - asyncResponse.resume(new RestException(exception)); - } + // will redirect if the topic not owned by current broker + validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES)) + .thenCompose(__ -> { + CompletableFuture ret; + if (topicName.isGlobal()) { + ret = validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + ret = CompletableFuture.completedFuture(null); + } + return ret.thenCompose(ignore -> { + return getTopicReferenceAsync(topicName) + .thenAccept(topic -> { + ManagedLedgerImpl ledger = + (ManagedLedgerImpl) ((PersistentTopic) topic).getManagedLedger(); + ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), + new AsyncCallbacks.ReadEntryCallback() { + @Override + public void readEntryFailed(ManagedLedgerException exception, + Object ctx) { + asyncResponse.resume(new RestException(exception)); + } - @Override - public void readEntryComplete(Entry entry, Object ctx) { - try { - asyncResponse.resume(generateResponseWithEntry(entry)); - } catch (IOException exception) { - asyncResponse.resume(new RestException(exception)); - } finally { - if (entry != null) { - entry.release(); - } + @Override + public void readEntryComplete(Entry entry, Object ctx) { + try { + asyncResponse.resume(generateResponseWithEntry(entry)); + } catch (IOException exception) { + asyncResponse.resume(new RestException(exception)); + } finally { + if (entry != null) { + entry.release(); + } + } + } + }, null); + }); + }); + }).exceptionally(ex -> { + Throwable cause = ex.getCause(); + if (cause instanceof NullPointerException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Message not found")); + } else { + log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}", + clientAppId(), ledgerId, entryId, topicName, cause); + asyncResponse.resume(new RestException(cause)); } - } - }, null); - } catch (NullPointerException npe) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Message not found")); - } catch (Exception exception) { - log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}", - clientAppId(), ledgerId, entryId, topicName, exception); - asyncResponse.resume(new RestException(exception)); - } + return null; + }); } protected CompletableFuture internalGetMessageIdByTimestamp(long timestamp, boolean authoritative) { From da042196a1da092d1a871773e25312847d629b9f Mon Sep 17 00:00:00 2001 From: technoboy Date: Fri, 21 Jan 2022 18:07:00 +0800 Subject: [PATCH 2/3] drop check NullPointerException. --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index bb22dff354b3a..95241d2265374 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2423,13 +2423,9 @@ public void readEntryComplete(Entry entry, Object ctx) { }); }).exceptionally(ex -> { Throwable cause = ex.getCause(); - if (cause instanceof NullPointerException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Message not found")); - } else { - log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}", - clientAppId(), ledgerId, entryId, topicName, cause); - asyncResponse.resume(new RestException(cause)); - } + log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}", + clientAppId(), ledgerId, entryId, topicName, cause); + asyncResponse.resume(new RestException(cause)); return null; }); } From 9baf4c8aff487e1b57cc81428dff8dbcfc2a8ce7 Mon Sep 17 00:00:00 2001 From: technoboy Date: Fri, 21 Jan 2022 19:42:38 +0800 Subject: [PATCH 3/3] updates --- .../admin/impl/PersistentTopicsBase.java | 53 +++++++++---------- 1 file changed, 26 insertions(+), 27 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 95241d2265374..b279daf6e2127 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2393,34 +2393,33 @@ protected void internalGetMessageById(AsyncResponse asyncResponse, long ledgerId } else { ret = CompletableFuture.completedFuture(null); } - return ret.thenCompose(ignore -> { - return getTopicReferenceAsync(topicName) - .thenAccept(topic -> { - ManagedLedgerImpl ledger = - (ManagedLedgerImpl) ((PersistentTopic) topic).getManagedLedger(); - ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), - new AsyncCallbacks.ReadEntryCallback() { - @Override - public void readEntryFailed(ManagedLedgerException exception, - Object ctx) { - asyncResponse.resume(new RestException(exception)); - } + return ret; + }) + .thenCompose(__ -> getTopicReferenceAsync(topicName)) + .thenAccept(topic -> { + ManagedLedgerImpl ledger = + (ManagedLedgerImpl) ((PersistentTopic) topic).getManagedLedger(); + ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), + new AsyncCallbacks.ReadEntryCallback() { + @Override + public void readEntryFailed(ManagedLedgerException exception, + Object ctx) { + asyncResponse.resume(new RestException(exception)); + } - @Override - public void readEntryComplete(Entry entry, Object ctx) { - try { - asyncResponse.resume(generateResponseWithEntry(entry)); - } catch (IOException exception) { - asyncResponse.resume(new RestException(exception)); - } finally { - if (entry != null) { - entry.release(); - } - } - } - }, null); - }); - }); + @Override + public void readEntryComplete(Entry entry, Object ctx) { + try { + asyncResponse.resume(generateResponseWithEntry(entry)); + } catch (IOException exception) { + asyncResponse.resume(new RestException(exception)); + } finally { + if (entry != null) { + entry.release(); + } + } + } + }, null); }).exceptionally(ex -> { Throwable cause = ex.getCause(); log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}",