Skip to content

Commit

Permalink
[improve] [broker] Do not print an Error log when responding to `HTTP…
Browse files Browse the repository at this point in the history
…-404` when calling `Admin API` and the topic does not exist. (apache#21995)
  • Loading branch information
poorbarcode authored Feb 18, 2024
1 parent baddda5 commit 48b4481
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,10 @@ protected static boolean isNotFoundException(Throwable ex) {
== Status.NOT_FOUND.getStatusCode();
}

protected static boolean isNot307And404Exception(Throwable ex) {
return !isRedirectException(ex) && !isNotFoundException(ex);
}

protected static String getTopicNotFoundErrorMessage(String topic) {
return String.format("Topic %s not found", topic);
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private CompletableFuture<Void> validateOwnershipAndOperationAsync(boolean autho


protected boolean shouldPrintErrorLog(Throwable ex) {
return !isRedirectException(ex) && !isNotFoundException(ex);
return isNot307And404Exception(ex);
}

private static final Logger log = LoggerFactory.getLogger(SchemasResourceBase.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void getInternalStats(
})
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -479,7 +479,7 @@ public void getListFromBundle(
}
asyncResponse.resume(topicList);
}).exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(),
namespaceName, bundleRange, ex);
}
Expand All @@ -488,7 +488,7 @@ public void getListFromBundle(
});
}
}).exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(),
namespaceName, bundleRange, ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void getList(
internalGetListAsync(Optional.ofNullable(bundle))
.thenAccept(topicList -> asyncResponse.resume(filterSystemTopic(topicList, includeSystemTopic)))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get topic list {}", clientAppId(), namespaceName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -150,7 +150,7 @@ public void getPartitionedTopicList(
.thenAccept(partitionedTopicList -> asyncResponse.resume(
filterSystemTopic(partitionedTopicList, includeSystemTopic)))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned topic list {}", clientAppId(), namespaceName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -335,7 +335,7 @@ public void createNonPartitionedTopic(
internalCreateNonPartitionedTopicAsync(authoritative, properties)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -825,7 +825,7 @@ public void updatePartitionedTopic(
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (isNot307And404Exception(ex)) {
log.error("[{}][{}] Failed to update partition to {}",
clientAppId(), topicName, numPartitions, ex);
}
Expand Down Expand Up @@ -934,7 +934,7 @@ public void getProperties(
internalGetPropertiesAsync(authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get topic {} properties", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -970,7 +970,7 @@ public void updateProperties(
internalUpdatePropertiesAsync(authoritative, properties)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to update topic {} properties", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -1004,7 +1004,7 @@ public void removeProperties(
internalRemovePropertiesAsync(authoritative, key)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to remove key {} in properties on topic {}",
clientAppId(), key, topicName, ex);
}
Expand Down Expand Up @@ -1125,7 +1125,7 @@ public void deleteTopic(
} else if (isManagedLedgerNotFoundException(t)) {
ex = new RestException(Response.Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString()));
} else if (!isRedirectException(ex)) {
} else if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to delete topic {}", clientAppId(), topicName, t);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -1209,7 +1209,7 @@ public void getStats(
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get stats for {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -1243,7 +1243,7 @@ public void getInternalStats(
internalGetInternalStatsAsync(authoritative, metadata)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -1892,7 +1892,7 @@ public void peekNthMessage(
internalPeekNthMessageAsync(decode(encodedSubName), messagePosition, authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get peek nth message for topic {} subscription {}", clientAppId(),
topicName, decode(encodedSubName), ex);
}
Expand Down Expand Up @@ -1934,7 +1934,7 @@ public void examineMessage(
internalExamineMessageAsync(initialPosition, messagePosition, authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to examine a specific message on the topic {}", clientAppId(), topicName,
ex);
}
Expand Down Expand Up @@ -1976,7 +1976,7 @@ public void getMessageById(
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}",
clientAppId(), ledgerId, entryId, topicName, ex);
}
Expand Down Expand Up @@ -2020,7 +2020,7 @@ public void getMessageIdByTimestamp(
}
})
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get message ID by timestamp {} from {}",
clientAppId(), timestamp, topicName, ex);
}
Expand Down Expand Up @@ -2055,7 +2055,7 @@ public void getBacklog(
log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", clientAppId(),
namespaceName);
ex = new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
} else if (!isRedirectException(ex)) {
} else if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get estimated backlog for topic {}", clientAppId(), encodedTopic, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -3173,7 +3173,7 @@ public void terminate(
internalTerminateAsync(authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to terminated topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -3269,7 +3269,7 @@ public void compactionStatus(
internalCompactionStatusAsync(authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get the status of a compaction operation for the topic {}",
clientAppId(), topicName, ex);
}
Expand Down Expand Up @@ -3408,7 +3408,7 @@ public void trimTopic(
validateTopicName(tenant, namespace, encodedTopic);
internalTrimTopic(asyncResponse, authoritative).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to trim topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void getTransactionInBufferStats(@Suspended final AsyncResponse asyncResp
Long.parseLong(leastSigBits))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get transaction state in transaction buffer {}",
clientAppId(), topicName, ex);
}
Expand Down Expand Up @@ -143,7 +143,7 @@ public void getTransactionInPendingAckStats(@Suspended final AsyncResponse async
Long.parseLong(leastSigBits), subName)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get transaction state in pending ack {}",
clientAppId(), topicName, ex);
}
Expand Down Expand Up @@ -181,7 +181,7 @@ public void getTransactionBufferStats(@Suspended final AsyncResponse asyncRespon
internalGetTransactionBufferStats(authoritative, lowWaterMarks, segmentStats)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get transaction buffer stats in topic {}",
clientAppId(), topicName, ex);
}
Expand Down Expand Up @@ -217,7 +217,7 @@ public void getPendingAckStats(@Suspended final AsyncResponse asyncResponse,
internalGetPendingAckStats(authoritative, subName, lowWaterMarks)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get transaction pending ack stats in topic {}",
clientAppId(), topicName, ex);
}
Expand Down Expand Up @@ -314,7 +314,7 @@ public void getPendingAckInternalStats(@Suspended final AsyncResponse asyncRespo
internalGetPendingAckInternalStats(authoritative, subName, metadata)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get pending ack internal stats {}",
clientAppId(), topicName, ex);
}
Expand Down Expand Up @@ -365,7 +365,7 @@ public void getTransactionBufferInternalStats(@Suspended final AsyncResponse asy
internalGetTransactionBufferInternalStats(authoritative, metadata)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get transaction buffer internal stats {}",
clientAppId(), topicName, ex);
}
Expand Down

0 comments on commit 48b4481

Please sign in to comment.