diff --git a/bouncy-castle/bc/pom.xml b/bouncy-castle/bc/pom.xml
index c9b159abf62c1..1b5c28a6a7fd0 100644
--- a/bouncy-castle/bc/pom.xml
+++ b/bouncy-castle/bc/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
bouncy-castle-parent
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/bouncy-castle/bcfips-include-test/pom.xml b/bouncy-castle/bcfips-include-test/pom.xml
index c1892ebaf9886..578e7b56c9a81 100644
--- a/bouncy-castle/bcfips-include-test/pom.xml
+++ b/bouncy-castle/bcfips-include-test/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
bouncy-castle-parent
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/bouncy-castle/bcfips/pom.xml b/bouncy-castle/bcfips/pom.xml
index e5874a8038d0f..85f34a4556909 100644
--- a/bouncy-castle/bcfips/pom.xml
+++ b/bouncy-castle/bcfips/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
bouncy-castle-parent
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/bouncy-castle/pom.xml b/bouncy-castle/pom.xml
index 7f2b96e6a9a1f..da23d3775e7e3 100644
--- a/bouncy-castle/pom.xml
+++ b/bouncy-castle/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index 35a1db6bc60c3..37130271def02 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -31,7 +31,7 @@
org.apache.pulsar
buildtools
- 2.10.6-SNAPSHOT
+ 2.10.6
jar
Pulsar Build Tools
diff --git a/distribution/io/pom.xml b/distribution/io/pom.xml
index ad0af3763c167..95cbeb9b510aa 100644
--- a/distribution/io/pom.xml
+++ b/distribution/io/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
distribution
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/distribution/offloaders/pom.xml b/distribution/offloaders/pom.xml
index 504bf5fc3a4d0..4c539ee71161f 100644
--- a/distribution/offloaders/pom.xml
+++ b/distribution/offloaders/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
distribution
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 8c8e683097c60..69521c814e77b 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/distribution/server/pom.xml b/distribution/server/pom.xml
index c3254faf281f7..dca89684cf025 100644
--- a/distribution/server/pom.xml
+++ b/distribution/server/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
distribution
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/docker/pom.xml b/docker/pom.xml
index 1df9ebfdd2cc6..0a5f85f6c5f8e 100644
--- a/docker/pom.xml
+++ b/docker/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
docker-images
Apache Pulsar :: Docker Images
diff --git a/docker/pulsar-all/pom.xml b/docker/pulsar-all/pom.xml
index da806408d2ef5..318815f2eac14 100644
--- a/docker/pulsar-all/pom.xml
+++ b/docker/pulsar-all/pom.xml
@@ -23,7 +23,7 @@
org.apache.pulsar
docker-images
- 2.10.6-SNAPSHOT
+ 2.10.6
4.0.0
pulsar-all-docker-image
diff --git a/docker/pulsar/pom.xml b/docker/pulsar/pom.xml
index c98f610b198b6..24259a73aeb06 100644
--- a/docker/pulsar/pom.xml
+++ b/docker/pulsar/pom.xml
@@ -23,7 +23,7 @@
org.apache.pulsar
docker-images
- 2.10.6-SNAPSHOT
+ 2.10.6
4.0.0
pulsar-docker-image
diff --git a/jclouds-shaded/pom.xml b/jclouds-shaded/pom.xml
index 7fd4e19361074..0160ae0ce66e5 100644
--- a/jclouds-shaded/pom.xml
+++ b/jclouds-shaded/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/kafka-connect-avro-converter-shaded/pom.xml b/kafka-connect-avro-converter-shaded/pom.xml
index ede99b19e0621..f38bc7d0c4187 100644
--- a/kafka-connect-avro-converter-shaded/pom.xml
+++ b/kafka-connect-avro-converter-shaded/pom.xml
@@ -26,7 +26,7 @@
pulsar
org.apache.pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index 8739e94bd51d2..d3411be3ad975 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pom.xml b/pom.xml
index 1de3ba2270b5e..1a12ff8109d56 100644
--- a/pom.xml
+++ b/pom.xml
@@ -32,7 +32,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
Pulsar
Pulsar is a distributed pub-sub messaging platform with a very
diff --git a/pulsar-broker-auth-athenz/pom.xml b/pulsar-broker-auth-athenz/pom.xml
index 1b94ad5d255da..2b3540f7ee581 100644
--- a/pulsar-broker-auth-athenz/pom.xml
+++ b/pulsar-broker-auth-athenz/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-broker-auth-athenz
diff --git a/pulsar-broker-auth-sasl/pom.xml b/pulsar-broker-auth-sasl/pom.xml
index 062830aa94bb7..6cb359b07446b 100644
--- a/pulsar-broker-auth-sasl/pom.xml
+++ b/pulsar-broker-auth-sasl/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-broker-auth-sasl
diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml
index d7a2e0556c613..e1e3d0d87bc04 100644
--- a/pulsar-broker-common/pom.xml
+++ b/pulsar-broker-common/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-broker-common
diff --git a/pulsar-broker-shaded/pom.xml b/pulsar-broker-shaded/pom.xml
index ee920cc79a90c..53a7db3cf6057 100644
--- a/pulsar-broker-shaded/pom.xml
+++ b/pulsar-broker-shaded/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index b14afae47fe57..e33f5a683758b 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 30526a0787d24..f64e1d94507e5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -798,6 +798,17 @@ protected static boolean isRedirectException(Throwable ex) {
== Status.TEMPORARY_REDIRECT.getStatusCode();
}
+ protected static boolean isNotFoundException(Throwable ex) {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ return realCause instanceof WebApplicationException
+ && ((WebApplicationException) realCause).getResponse().getStatus()
+ == Status.NOT_FOUND.getStatusCode();
+ }
+
+ protected static boolean isNot307And404Exception(Throwable ex) {
+ return !isRedirectException(ex) && !isNotFoundException(ex);
+ }
+
protected static String getTopicNotFoundErrorMessage(String topic) {
return String.format("Topic %s not found", topic);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 867f45a5e3ccc..82711096701f2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -838,7 +838,7 @@ protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authorit
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned metadata while unloading topic {}",
clientAppId(), topicName, ex);
}
@@ -848,7 +848,7 @@ protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authorit
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to validate the global namespace ownership while unloading topic {}",
clientAppId(), topicName, ex);
}
@@ -1058,7 +1058,7 @@ private void internalUnloadNonPartitionedTopicAsync(AsyncResponse asyncResponse,
}))
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1080,7 +1080,7 @@ private void internalUnloadTransactionCoordinatorAsync(AsyncResponse asyncRespon
}))
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to unload tc {},{}", clientAppId(),
topicName.getPartitionIndex(), ex);
}
@@ -1206,7 +1206,7 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned topic metadata while get"
+ " subscriptions for topic {}", clientAppId(), topicName, ex);
}
@@ -1216,7 +1216,7 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to validate the global namespace/topic ownership while get subscriptions"
+ " for topic {}", clientAppId(), topicName, ex);
}
@@ -1225,7 +1225,7 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut
})
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get subscriptions for {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1264,7 +1264,7 @@ private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncR
.thenAccept(topic -> asyncResponse.resume(Lists.newArrayList(topic.getSubscriptions().keys())))
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1376,7 +1376,7 @@ protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, boolean
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned metadata while get managed info for {}",
clientAppId(), topicName, ex);
}
@@ -1386,7 +1386,7 @@ protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, boolean
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to validate the global namespace ownership while get managed info for {}",
clientAppId(), topicName, ex);
}
@@ -1488,7 +1488,7 @@ protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned internal stats for {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1542,7 +1542,7 @@ protected void internalGetPartitionedStatsInternal(AsyncResponse asyncResponse,
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned internal stats for {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1656,8 +1656,9 @@ private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyn
"Subscription has active connected consumers"));
} else {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
- log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, cause);
+ if (!isNot307And404Exception(ex)) {
+ log.error("[{}] Failed to analyze subscription backlog {} {}",
+ clientAppId(), topicName, subName, cause);
}
asyncResponse.resume(new RestException(cause));
}
@@ -1683,7 +1684,7 @@ private void internalUpdateSubscriptionPropertiesForNonPartitionedTopic(AsyncRes
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to update subscription {} {}", clientAppId(), topicName, subName, cause);
}
asyncResponse.resume(new RestException(cause));
@@ -1850,7 +1851,7 @@ protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subNa
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to skip all messages for subscription {} on topic {}",
clientAppId(), subName, topicName, ex);
}
@@ -1893,7 +1894,7 @@ private CompletableFuture internalSkipAllMessagesForNonPartitionedTopicAsy
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to skip all messages for subscription {} on topic {}",
clientAppId(), subName, topicName, ex);
}
@@ -1955,7 +1956,7 @@ protected void internalSkipMessages(AsyncResponse asyncResponse, String subName,
})
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to skip {} messages {} {}", clientAppId(), numMessages, topicName,
subName, ex);
}
@@ -2022,7 +2023,7 @@ protected void internalExpireMessagesForAllSubscriptions(AsyncResponse asyncResp
)
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName,
ex);
}
@@ -2085,7 +2086,7 @@ private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
})
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to expire messages for all subscription up to {} on {}", clientAppId(),
expireTimeInSeconds, topicName, ex);
}
@@ -2290,7 +2291,7 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to create subscription {} on topic {}",
clientAppId(), subscriptionName, topicName, ex);
}
@@ -2300,7 +2301,7 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to create subscription {} on topic {}",
clientAppId(), subscriptionName, topicName, ex);
}
@@ -2431,7 +2432,7 @@ protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse,
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to update subscription {} from topic {}",
clientAppId(), subName, topicName, ex);
}
@@ -2512,7 +2513,7 @@ protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.warn("[{}][{}] Failed to reset cursor on subscription {} to position {}",
clientAppId(), topicName, subName, messageId, ex.getCause());
}
@@ -2521,7 +2522,7 @@ protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.warn("[{}][{}] Failed to reset cursor on subscription {} to position {}",
clientAppId(), topicName, subName, messageId, ex.getCause());
}
@@ -2619,60 +2620,65 @@ private PositionImpl calculatePositionAckSet(boolean isExcluded, int batchSize,
return seekPosition;
}
- protected void internalGetMessageById(AsyncResponse asyncResponse, long ledgerId, long entryId,
- boolean authoritative) {
- // will redirect if the topic not owned by current broker
- validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES))
- .thenCompose(__ -> {
- CompletableFuture ret;
- if (topicName.isGlobal()) {
- ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
- } else {
- ret = CompletableFuture.completedFuture(null);
- }
- return ret;
- })
- .thenCompose(__ -> getTopicReferenceAsync(topicName))
- .thenAccept(topic -> {
- ManagedLedgerImpl ledger =
- (ManagedLedgerImpl) ((PersistentTopic) topic).getManagedLedger();
- ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId),
- new AsyncCallbacks.ReadEntryCallback() {
- @Override
- public void readEntryFailed(ManagedLedgerException exception,
- Object ctx) {
- asyncResponse.resume(new RestException(exception));
- }
+ protected CompletableFuture internalGetMessageById(long ledgerId, long entryId, boolean authoritative) {
+ CompletableFuture future;
+ if (topicName.isGlobal()) {
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ future = CompletableFuture.completedFuture(null);
+ }
+ return future.thenCompose(__ -> {
+ if (topicName.isPartitioned()) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+ .thenAccept(topicMetadata -> {
+ if (topicMetadata.partitions > 0) {
+ log.warn("[{}] Not supported getMessageById operation on partitioned-topic {}",
+ clientAppId(), topicName);
+ throw new RestException(Status.METHOD_NOT_ALLOWED,
+ "GetMessageById is not allowed on partitioned-topic");
+ }
+ });
- @Override
- public void readEntryComplete(Entry entry, Object ctx) {
- try {
- asyncResponse.resume(generateResponseWithEntry(entry));
- } catch (IOException exception) {
- asyncResponse.resume(new RestException(exception));
- } finally {
- if (entry != null) {
- entry.release();
- }
- }
- }
+ }
+ })
+ .thenCompose(ignore -> validateTopicOwnershipAsync(topicName, authoritative))
+ .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES))
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> {
+ CompletableFuture results = new CompletableFuture<>();
+ ManagedLedgerImpl ledger =
+ (ManagedLedgerImpl) ((PersistentTopic) topic).getManagedLedger();
+ ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId),
+ new AsyncCallbacks.ReadEntryCallback() {
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception,
+ Object ctx) {
+ throw new RestException(exception);
+ }
- @Override
- public String toString() {
- return String.format("Topic [%s] internal get message by id",
- PersistentTopicsBase.this.topicName);
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ try {
+ results.complete(generateResponseWithEntry(entry));
+ } catch (IOException exception) {
+ throw new RestException(exception);
+ } finally {
+ if (entry != null) {
+ entry.release();
}
- }, null);
- }).exceptionally(ex -> {
- // If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
- log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}",
- clientAppId(), ledgerId, entryId, topicName, ex);
- }
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
- });
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Topic [%s] internal get message by id",
+ PersistentTopicsBase.this.topicName);
+ }
+ }, null);
+ return results;
+ });
}
protected CompletableFuture internalGetMessageIdByTimestamp(long timestamp, boolean authoritative) {
@@ -3064,7 +3070,7 @@ protected void internalGetBacklogSizeByMessageId(AsyncResponse asyncResponse,
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get backlog size for topic {}", clientAppId(),
topicName, ex);
}
@@ -3074,7 +3080,7 @@ protected void internalGetBacklogSizeByMessageId(AsyncResponse asyncResponse,
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get backlog size for topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -3082,7 +3088,7 @@ protected void internalGetBacklogSizeByMessageId(AsyncResponse asyncResponse,
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to validate global namespace ownership to get backlog size for topic "
+ "{}", clientAppId(), topicName, ex);
}
@@ -3594,7 +3600,7 @@ protected void internalTerminatePartitionedTopic(AsyncResponse asyncResponse, bo
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to terminate topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -3602,7 +3608,7 @@ protected void internalTerminatePartitionedTopic(AsyncResponse asyncResponse, bo
})
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to terminate topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -3683,7 +3689,7 @@ protected void internalExpireMessagesByTimestamp(AsyncResponse asyncResponse, St
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(),
expireTimeInSeconds, topicName, ex);
}
@@ -3793,7 +3799,7 @@ protected void internalExpireMessagesByPosition(AsyncResponse asyncResponse, Str
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to expire messages up to {} on subscription {} to position {}",
clientAppId(), topicName, subName, messageId, ex);
}
@@ -3948,7 +3954,7 @@ protected void internalTriggerCompaction(AsyncResponse asyncResponse, boolean au
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to trigger compaction on topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -3957,7 +3963,7 @@ protected void internalTriggerCompaction(AsyncResponse asyncResponse, boolean au
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to validate global namespace ownership to trigger compaction on topic {}",
clientAppId(), topicName, ex);
}
@@ -3986,7 +3992,7 @@ protected void internalTriggerCompactionNonPartitionedTopic(AsyncResponse asyncR
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to trigger compaction for {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -4023,7 +4029,7 @@ protected void internalTriggerOffload(AsyncResponse asyncResponse,
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to trigger offload for {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -4040,7 +4046,7 @@ protected void internalOffloadStatus(AsyncResponse asyncResponse, boolean author
asyncResponse.resume(offloadProcessStatus);
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to offload status on topic {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -4513,7 +4519,7 @@ protected void internalGetLastMessageId(AsyncResponse asyncResponse, boolean aut
});
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get last messageId {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -4774,9 +4780,7 @@ protected CompletableFuture internalRemoveSubscribeRate(boolean isGlobal)
protected void handleTopicPolicyException(String methodName, Throwable thr, AsyncResponse asyncResponse) {
Throwable cause = thr.getCause();
- if (!(cause instanceof WebApplicationException) || !(
- ((WebApplicationException) cause).getResponse().getStatus() == 307
- || ((WebApplicationException) cause).getResponse().getStatus() == 404)) {
+ if (isNot307And404Exception(cause)) {
log.error("[{}] Failed to perform {} on topic {}",
clientAppId(), methodName, topicName, cause);
}
@@ -4938,7 +4942,7 @@ protected void internalSetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
resultFuture.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.warn("[{}] Failed to change replicated subscription status to {} - {} {}", clientAppId(), enabled,
topicName, subName, ex);
}
@@ -4983,7 +4987,7 @@ private void internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(
}
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to set replicated subscription status on {} {}", clientAppId(),
topicName, subName, ex);
}
@@ -5085,7 +5089,7 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
+ if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get replicated subscription status on {} {}", clientAppId(),
topicName, subName, ex);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 2009de113d2a4..f246b00419d9b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -720,14 +720,18 @@ public void getMessageByID(@Suspended final AsyncResponse asyncResponse, @PathPa
@PathParam("topic") @Encoded String encodedTopic, @PathParam("ledgerId") Long ledgerId,
@PathParam("entryId") Long entryId,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
- try {
- validateTopicName(property, cluster, namespace, encodedTopic);
- internalGetMessageById(asyncResponse, ledgerId, entryId, authoritative);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateTopicName(property, cluster, namespace, encodedTopic);
+ internalGetMessageById(ledgerId, entryId, authoritative)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ // If the exception is not redirect exception we need to log it.
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}",
+ clientAppId(), ledgerId, entryId, topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 87cc6650bb902..b75ec765796e9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -498,19 +498,19 @@ public void getListFromBundle(
}
asyncResponse.resume(topicList);
} catch (Exception e) {
- log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(),
- namespaceName, bundleRange, e);
+ if (!isNot307And404Exception(e)) {
+ log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(),
+ namespaceName, bundleRange, e);
+ }
asyncResponse.resume(new RestException(e));
}
}
}).exceptionally(ex -> {
- log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(),
- namespaceName, bundleRange, ex);
- if (ex.getCause() instanceof WebApplicationException) {
- asyncResponse.resume(ex.getCause());
- } else {
- asyncResponse.resume(new RestException(ex.getCause()));
+ if (!isNot307And404Exception(ex)) {
+ log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(),
+ namespaceName, bundleRange, ex);
}
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index afa3f4495daa1..4dd4c1310cc62 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -68,6 +68,7 @@
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
@@ -105,7 +106,7 @@ public void getList(
internalGetListAsync(Optional.ofNullable(bundle))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get topic list {}", clientAppId(), namespaceName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -347,7 +348,8 @@ public void getOffloadPolicies(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetOffloadPolicies(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -370,7 +372,9 @@ public void setOffloadPolicies(@Suspended final AsyncResponse asyncResponse,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Offload policies for the specified topic") OffloadPoliciesImpl offloadPolicies) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperation(topicName, PolicyName.OFFLOAD, PolicyOperation.WRITE);
+ validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetOffloadPolicies(offloadPolicies, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -392,7 +396,8 @@ public void removeOffloadPolicies(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetOffloadPolicies(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -416,7 +421,8 @@ public void getMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse async
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetMaxUnackedMessagesOnConsumer(applied, isGlobal))
.thenApply(asyncResponse::resume).exceptionally(ex -> {
handleTopicPolicyException("getMaxUnackedMessagesOnConsumer", ex, asyncResponse);
@@ -440,7 +446,8 @@ public void setMaxUnackedMessagesOnConsumer(
@ApiParam(value = "Max unacked messages on consumer policies for the specified topic")
Integer maxUnackedNum) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -462,7 +469,8 @@ public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse as
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -485,7 +493,8 @@ public void getDeduplicationSnapshotInterval(@Suspended final AsyncResponse asyn
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal))
.thenAccept(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
@@ -513,7 +522,8 @@ public void setDeduplicationSnapshotInterval(
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetDeduplicationSnapshotInterval(interval, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -535,7 +545,8 @@ public void deleteDeduplicationSnapshotInterval(@Suspended final AsyncResponse a
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetDeduplicationSnapshotInterval(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -559,7 +570,8 @@ public void getInactiveTopicPolicies(@Suspended final AsyncResponse asyncRespons
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.INACTIVE_TOPIC, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetInactiveTopicPolicies(applied, isGlobal))
.thenApply(asyncResponse::resume).exceptionally(ex -> {
handleTopicPolicyException("getInactiveTopicPolicies", ex, asyncResponse);
@@ -582,7 +594,8 @@ public void setInactiveTopicPolicies(@Suspended final AsyncResponse asyncRespons
@ApiParam(value = "inactive topic policies for the specified topic")
InactiveTopicPolicies inactiveTopicPolicies) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetInactiveTopicPolicies(inactiveTopicPolicies, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -604,7 +617,8 @@ public void deleteInactiveTopicPolicies(@Suspended final AsyncResponse asyncResp
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.INACTIVE_TOPIC, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetInactiveTopicPolicies(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -628,7 +642,8 @@ public void getMaxUnackedMessagesOnSubscription(@Suspended final AsyncResponse a
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetMaxUnackedMessagesOnSubscription(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -703,7 +718,8 @@ public void getDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncRespo
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.DELAYED_DELIVERY, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetDelayedDeliveryPolicies(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -907,7 +923,7 @@ public void getProperties(
internalGetPropertiesAsync(authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
- if (!isRedirectException(ex)) {
+ if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to get topic {} properties", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1669,6 +1685,7 @@ public Response examineMessage(
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
+ validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
return internalExamineMessage(initialPosition, messagePosition, authoritative);
}
@@ -1699,14 +1716,18 @@ public void getMessageById(
@PathParam("entryId") long entryId,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
- try {
- validateTopicName(tenant, namespace, encodedTopic);
- internalGetMessageById(asyncResponse, ledgerId, entryId, authoritative);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalGetMessageById(ledgerId, entryId, authoritative)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ // If the exception is not redirect exception we need to log it.
+ if (isNot307And404Exception(ex)) {
+ log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}",
+ clientAppId(), ledgerId, entryId, topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@@ -1744,9 +1765,11 @@ public void getMessageIdByTimestamp(
}
})
.exceptionally(ex -> {
- log.error("[{}] Failed to get message ID by timestamp {} from {}",
- clientAppId(), timestamp, topicName, ex);
- resumeAsyncResponseExceptionally(asyncResponse, ex);
+ if (isNot307And404Exception(ex)) {
+ log.error("[{}] Failed to get message ID by timestamp {} from {}",
+ clientAppId(), timestamp, topicName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
return null;
});
}
@@ -1768,6 +1791,7 @@ public PersistentOfflineTopicStats getBacklog(
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
+ validateTopicOperation(topicName, TopicOperation.GET_BACKLOG_SIZE);
return internalGetBacklog(authoritative);
}
@@ -1811,7 +1835,8 @@ public void getBacklogQuotaMap(
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.BACKLOG, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetBacklogQuota(applied, isGlobal))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
@@ -1889,7 +1914,8 @@ public void getReplicationClusters(@Suspended final AsyncResponse asyncResponse,
+ "For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
.thenAccept(op -> {
asyncResponse.resume(op.map(TopicPolicies::getReplicationClustersSet).orElseGet(() -> {
@@ -1971,7 +1997,8 @@ public void getMessageTTL(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.TTL, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal))
.thenAccept(op -> asyncResponse.resume(op
.map(TopicPolicies::getMessageTTLInSeconds)
@@ -2008,7 +2035,8 @@ public void setMessageTTL(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.TTL, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMessageTTL(messageTTL, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -2035,7 +2063,8 @@ public void removeMessageTTL(@Suspended final AsyncResponse asyncResponse,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.TTL, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMessageTTL(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -2061,7 +2090,8 @@ public void getDeduplication(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetDeduplication(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -2088,7 +2118,8 @@ public void setDeduplication(
@ApiParam(value = "DeduplicationEnabled policies for the specified topic")
Boolean enabled) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetDeduplication(enabled, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -2113,7 +2144,8 @@ public void removeDeduplication(@Suspended final AsyncResponse asyncResponse,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetDeduplication(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
@@ -2237,7 +2269,8 @@ public void getPersistence(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.PERSISTENCE, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetPersistence(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -2265,7 +2298,8 @@ public void setPersistence(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Bookkeeper persistence policies for specified topic")
PersistencePolicies persistencePolicies) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.PERSISTENCE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetPersistence(persistencePolicies, isGlobal))
.thenRun(() -> {
try {
@@ -2301,7 +2335,8 @@ public void removePersistence(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.PERSISTENCE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemovePersistence(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove persistence policies: namespace={}, topic={}",
@@ -2332,7 +2367,8 @@ public void getMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResp
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetMaxSubscriptionsPerTopic(isGlobal))
.thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
: Response.noContent().build()))
@@ -2360,7 +2396,8 @@ public void setMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResp
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "The max subscriptions of the topic") int maxSubscriptionsPerTopic) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully updated maxSubscriptionsPerTopic: namespace={}, topic={}"
@@ -2390,7 +2427,8 @@ public void removeMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncR
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(null, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove maxSubscriptionsPerTopic: namespace={}, topic={}",
@@ -2420,7 +2458,8 @@ public void getReplicatorDispatchRate(@Suspended final AsyncResponse asyncRespon
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetReplicatorDispatchRate(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -2447,7 +2486,8 @@ public void setReplicatorDispatchRate(@Suspended final AsyncResponse asyncRespon
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Replicator dispatch rate of the topic") DispatchRateImpl dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetReplicatorDispatchRate(dispatchRate, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully updated replicatorDispatchRate: namespace={}, topic={}"
@@ -2477,7 +2517,8 @@ public void removeReplicatorDispatchRate(@Suspended final AsyncResponse asyncRes
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION_RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetReplicatorDispatchRate(null, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove replicatorDispatchRate limit: namespace={}, topic={}",
@@ -2507,7 +2548,8 @@ public void getMaxProducers(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetMaxProducers(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -2534,7 +2576,8 @@ public void setMaxProducers(@Suspended final AsyncResponse asyncResponse,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "The max producers of the topic") int maxProducers) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxProducers(maxProducers, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully updated max producers: namespace={}, topic={}, maxProducers={}",
@@ -2566,7 +2609,8 @@ public void removeMaxProducers(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveMaxProducers(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove max producers: namespace={}, topic={}",
@@ -2598,7 +2642,8 @@ public void getMaxConsumers(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetMaxConsumers(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -2625,7 +2670,8 @@ public void setMaxConsumers(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "The max consumers of the topic") int maxConsumers) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxConsumers(maxConsumers, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully updated max consumers: namespace={}, topic={}, maxConsumers={}",
@@ -2657,7 +2703,8 @@ public void removeMaxConsumers(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveMaxConsumers(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove max consumers: namespace={}, topic={}",
@@ -2688,7 +2735,8 @@ public void getMaxMessageSize(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateAdminAccessForTenantAsync(topicName.getTenant())
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetMaxMessageSize(isGlobal))
.thenAccept(policies -> {
asyncResponse.resume(policies.isPresent() ? policies.get() : Response.noContent().build());
@@ -2717,7 +2765,8 @@ public void setMaxMessageSize(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "The max message size of the topic") int maxMessageSize) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateAdminAccessForTenantAsync(topicName.getTenant())
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxMessageSize(maxMessageSize, isGlobal))
.thenRun(() -> {
log.info(
@@ -2751,7 +2800,8 @@ public void removeMaxMessageSize(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateAdminAccessForTenantAsync(topicName.getTenant())
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxMessageSize(null, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove max message size: namespace={}, topic={}",
@@ -3002,7 +3052,8 @@ public void getDispatchRate(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetDispatchRate(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -3028,7 +3079,8 @@ public void setDispatchRate(@Suspended final AsyncResponse asyncResponse,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Dispatch rate for the specified topic") DispatchRateImpl dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetDispatchRate(dispatchRate, isGlobal))
.thenRun(() -> {
try {
@@ -3064,7 +3116,8 @@ public void removeDispatchRate(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveDispatchRate(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove topic dispatch rate: tenant={}, namespace={}, topic={}",
@@ -3097,7 +3150,8 @@ public void getSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResp
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetSubscriptionDispatchRate(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -3125,7 +3179,8 @@ public void setSubscriptionDispatchRate(
@ApiParam(value = "Subscription message dispatch rate for the specified topic")
DispatchRateImpl dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetSubscriptionDispatchRate(dispatchRate, isGlobal))
.thenRun(() -> {
try {
@@ -3161,7 +3216,8 @@ public void removeSubscriptionDispatchRate(@Suspended final AsyncResponse asyncR
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveSubscriptionDispatchRate(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove topic subscription dispatch rate: tenant={}, namespace={}, topic={}",
@@ -3194,7 +3250,8 @@ public void getCompactionThreshold(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.COMPACTION, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetCompactionThreshold(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
@@ -3220,7 +3277,8 @@ public void setCompactionThreshold(@Suspended final AsyncResponse asyncResponse,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Dispatch rate for the specified topic") long compactionThreshold) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.COMPACTION, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetCompactionThreshold(compactionThreshold, isGlobal))
.thenRun(() -> {
try {
@@ -3256,7 +3314,8 @@ public void removeCompactionThreshold(@Suspended final AsyncResponse asyncRespon
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.COMPACTION, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveCompactionThreshold(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove topic compaction threshold: tenant={}, namespace={}, topic={}",
@@ -3288,7 +3347,8 @@ public void getMaxConsumersPerSubscription(@Suspended final AsyncResponse asyncR
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetMaxConsumersPerSubscription(isGlobal))
.thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
: Response.noContent().build()))
@@ -3316,7 +3376,8 @@ public void setMaxConsumersPerSubscription(
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Dispatch rate for the specified topic") int maxConsumersPerSubscription) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription, isGlobal))
.thenRun(() -> {
try {
@@ -3352,7 +3413,8 @@ public void removeMaxConsumersPerSubscription(@Suspended final AsyncResponse asy
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveMaxConsumersPerSubscription(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove topic max consumers per subscription:"
@@ -3385,7 +3447,8 @@ public void getPublishRate(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetPublishRate(isGlobal))
.thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
: Response.noContent().build()))
@@ -3412,7 +3475,8 @@ public void setPublishRate(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Dispatch rate for the specified topic") PublishRate publishRate) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetPublishRate(publishRate, isGlobal))
.thenRun(() -> {
try {
@@ -3449,7 +3513,8 @@ public void removePublishRate(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemovePublishRate(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove topic publish rate: tenant={}, namespace={}, topic={}, isGlobal={}",
@@ -3482,7 +3547,8 @@ public void getSubscriptionTypesEnabled(@Suspended final AsyncResponse asyncResp
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetSubscriptionTypesEnabled(isGlobal))
.thenAccept(op -> {
asyncResponse.resume(op.isPresent() ? op.get()
@@ -3512,7 +3578,8 @@ public void setSubscriptionTypesEnabled(@Suspended final AsyncResponse asyncResp
@ApiParam(value = "Enable sub types for the specified topic")
Set subscriptionTypesEnabled) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetSubscriptionTypesEnabled(subscriptionTypesEnabled, isGlobal))
.thenRun(() -> {
try {
@@ -3548,7 +3615,8 @@ public void removeSubscriptionTypesEnabled(@Suspended final AsyncResponse asyncR
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveSubscriptionTypesEnabled(isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove subscription types enabled: namespace={}, topic={}",
@@ -3580,7 +3648,8 @@ public void getSubscribeRate(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.READ)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetSubscribeRate(applied, isGlobal))
.thenApply(asyncResponse::resume).exceptionally(ex -> {
handleTopicPolicyException("getSubscribeRate", ex, asyncResponse);
@@ -3606,7 +3675,8 @@ public void setSubscribeRate(
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Subscribe rate for the specified topic") SubscribeRate subscribeRate) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetSubscribeRate(subscribeRate, isGlobal))
.thenRun(() -> {
try {
@@ -3644,7 +3714,8 @@ public void removeSubscribeRate(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Subscribe rate for the specified topic") SubscribeRate subscribeRate) {
validateTopicName(tenant, namespace, encodedTopic);
- preValidation(authoritative)
+ validateTopicPolicyOperationAsync(topicName, PolicyName.RATE, PolicyOperation.WRITE)
+ .thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveSubscribeRate(isGlobal))
.thenRun(() -> {
log.info(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index 9cb825b9f8e1f..a0faeb654596a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -90,6 +90,10 @@ public void getTransactionInBufferStats(@Suspended final AsyncResponse asyncResp
Long.parseLong(leastSigBits))
.thenAccept(stat -> asyncResponse.resume(stat))
.exceptionally(ex -> {
+ if (!isNot307And404Exception(ex)) {
+ log.error("[{}] Failed to get transaction state in transaction buffer {}",
+ clientAppId(), topicName, ex);
+ }
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
@@ -124,6 +128,10 @@ public void getTransactionInPendingAckStats(@Suspended final AsyncResponse async
Long.parseLong(leastSigBits), subName)
.thenAccept(stat -> asyncResponse.resume(stat))
.exceptionally(ex -> {
+ if (!isNot307And404Exception(ex)) {
+ log.error("[{}] Failed to get transaction state in pending ack {}",
+ clientAppId(), topicName, ex);
+ }
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
@@ -154,6 +162,10 @@ public void getTransactionBufferStats(@Suspended final AsyncResponse asyncRespon
internalGetTransactionBufferStats(authoritative)
.thenAccept(stat -> asyncResponse.resume(stat))
.exceptionally(ex -> {
+ if (!isNot307And404Exception(ex)) {
+ log.error("[{}] Failed to get transaction buffer stats in topic {}",
+ clientAppId(), topicName, ex);
+ }
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
@@ -185,6 +197,10 @@ public void getPendingAckStats(@Suspended final AsyncResponse asyncResponse,
internalGetPendingAckStats(authoritative, subName)
.thenAccept(stats -> asyncResponse.resume(stats))
.exceptionally(ex -> {
+ if (!isNot307And404Exception(ex)) {
+ log.error("[{}] Failed to get transaction pending ack stats in topic {}",
+ clientAppId(), topicName, ex);
+ }
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
index 2a2f329d5f47e..d63da5b7718aa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesAuthZTest.java
@@ -38,6 +38,8 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -46,7 +48,6 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-
public final class TopicPoliciesAuthZTest extends MockedPulsarServiceBaseTest {
private PulsarAdmin superUserAdmin;
@@ -219,4 +220,207 @@ public void testRetention() {
superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject);
}
}
+
+ @SneakyThrows
+ @Test
+ public void testMaxUnackedMessagesOnConsumer() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(pulsar.getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+
+ // mocked data
+ int definedUnackedMessagesOnConsumer = 100;
+
+ // test superuser
+ superUserAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, definedUnackedMessagesOnConsumer);
+
+ // because the topic policies is eventual consistency, we should wait here
+ await().untilAsserted(() -> {
+ final int unackedMessagesOnConsumer = superUserAdmin.topicPolicies()
+ .getMaxUnackedMessagesOnConsumer(topic);
+ Assert.assertEquals(unackedMessagesOnConsumer, definedUnackedMessagesOnConsumer);
+ });
+ superUserAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
+
+ await().untilAsserted(() -> {
+ final Integer unackedMessagesOnConsumer = superUserAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+ Assert.assertNull(unackedMessagesOnConsumer);
+ });
+
+ // test tenant manager
+
+ tenantManagerAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, definedUnackedMessagesOnConsumer);
+ await().untilAsserted(() -> {
+ final int unackedMessagesOnConsumer = tenantManagerAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+ Assert.assertEquals(unackedMessagesOnConsumer, definedUnackedMessagesOnConsumer);
+ });
+ tenantManagerAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
+ await().untilAsserted(() -> {
+ final Integer unackedMessagesOnConsumer = tenantManagerAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+ Assert.assertNull(unackedMessagesOnConsumer);
+ });
+
+ // test nobody
+
+ try {
+ subAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
+ subAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, definedUnackedMessagesOnConsumer);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+ subAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ // test sub user with permissions
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+ subject, Sets.newHashSet(action));
+ try {
+ subAdmin.topicPolicies().getMaxUnackedMessagesOnConsumer(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
+ subAdmin.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, definedUnackedMessagesOnConsumer);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+ subAdmin.topicPolicies().removeMaxUnackedMessagesOnConsumer(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+ superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject);
+ }
+ }
+
+ @SneakyThrows
+ @Test
+ public void testMaxUnackedMessagesOnSubscription() {
+ final String random = UUID.randomUUID().toString();
+ final String topic = "persistent://public/default/" + random;
+ final String subject = UUID.randomUUID().toString();
+ final String token = Jwts.builder()
+ .claim("sub", subject).signWith(SECRET_KEY).compact();
+ superUserAdmin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(pulsar.getWebServiceAddress())
+ .authentication(new AuthenticationToken(token))
+ .build();
+
+ // mocked data
+ int definedUnackedMessagesOnConsumer = 100;
+
+ // test superuser
+ superUserAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, definedUnackedMessagesOnConsumer);
+
+ // because the topic policies is eventual consistency, we should wait here
+ await().untilAsserted(() -> {
+ final int unackedMessagesOnConsumer = superUserAdmin.topicPolicies()
+ .getMaxUnackedMessagesOnSubscription(topic);
+ Assert.assertEquals(unackedMessagesOnConsumer, definedUnackedMessagesOnConsumer);
+ });
+ superUserAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
+
+ await().untilAsserted(() -> {
+ final Integer unackedMessagesOnConsumer = superUserAdmin.topicPolicies()
+ .getMaxUnackedMessagesOnSubscription(topic);
+ Assert.assertNull(unackedMessagesOnConsumer);
+ });
+
+ // test tenant manager
+
+ tenantManagerAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, definedUnackedMessagesOnConsumer);
+ await().untilAsserted(() -> {
+ final int unackedMessagesOnConsumer = tenantManagerAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic);
+ Assert.assertEquals(unackedMessagesOnConsumer, definedUnackedMessagesOnConsumer);
+ });
+ tenantManagerAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
+ await().untilAsserted(() -> {
+ final Integer unackedMessagesOnConsumer = tenantManagerAdmin.topicPolicies()
+ .getMaxUnackedMessagesOnSubscription(topic);
+ Assert.assertNull(unackedMessagesOnConsumer);
+ });
+
+ // test nobody
+
+ try {
+ subAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
+ subAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, definedUnackedMessagesOnConsumer);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+ subAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ // test sub user with permissions
+ for (AuthAction action : AuthAction.values()) {
+ superUserAdmin.namespaces().grantPermissionOnNamespace("public/default",
+ subject, Sets.newHashSet(action));
+ try {
+ subAdmin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+
+ subAdmin.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, definedUnackedMessagesOnConsumer);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+
+ try {
+ subAdmin.topicPolicies().removeMaxUnackedMessagesOnSubscription(topic);
+ Assert.fail("unexpected behaviour");
+ } catch (PulsarAdminException ex) {
+ Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException);
+ }
+ superUserAdmin.namespaces().revokePermissionsOnNamespace("public/default", subject);
+ }
+
+ }
}
diff --git a/pulsar-client-1x-base/pom.xml b/pulsar-client-1x-base/pom.xml
index 5c0f7c83947e8..a795409bc34a6 100644
--- a/pulsar-client-1x-base/pom.xml
+++ b/pulsar-client-1x-base/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-client-1x-base/pulsar-client-1x/pom.xml b/pulsar-client-1x-base/pulsar-client-1x/pom.xml
index 6d8b2df4d916b..0f1b7caef79ae 100644
--- a/pulsar-client-1x-base/pulsar-client-1x/pom.xml
+++ b/pulsar-client-1x-base/pulsar-client-1x/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-client-1x-base
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml b/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml
index 9526af73fc23b..6ab11f3a39cc4 100644
--- a/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml
+++ b/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-client-1x-base
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-client-admin-api/pom.xml b/pulsar-client-admin-api/pom.xml
index 9ff21d3fdaecb..326a6e64adab6 100644
--- a/pulsar-client-admin-api/pom.xml
+++ b/pulsar-client-admin-api/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-client-admin-shaded/pom.xml b/pulsar-client-admin-shaded/pom.xml
index 0f73854bc7b9b..3d45d3047eeb7 100644
--- a/pulsar-client-admin-shaded/pom.xml
+++ b/pulsar-client-admin-shaded/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-client-admin/pom.xml b/pulsar-client-admin/pom.xml
index 4061ef0718d1f..51ea727388307 100644
--- a/pulsar-client-admin/pom.xml
+++ b/pulsar-client-admin/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-client-all/pom.xml b/pulsar-client-all/pom.xml
index 9d66319f0f6bd..0bd7e822fa668 100644
--- a/pulsar-client-all/pom.xml
+++ b/pulsar-client-all/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-client-api/pom.xml b/pulsar-client-api/pom.xml
index 50a693334ccc1..12700ceba27cc 100644
--- a/pulsar-client-api/pom.xml
+++ b/pulsar-client-api/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-client-auth-athenz/pom.xml b/pulsar-client-auth-athenz/pom.xml
index a6defb7f34f69..523dc8c23f6cf 100644
--- a/pulsar-client-auth-athenz/pom.xml
+++ b/pulsar-client-auth-athenz/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-client-auth-sasl/pom.xml b/pulsar-client-auth-sasl/pom.xml
index 955698272031d..c78e06caece61 100644
--- a/pulsar-client-auth-sasl/pom.xml
+++ b/pulsar-client-auth-sasl/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-client-messagecrypto-bc/pom.xml b/pulsar-client-messagecrypto-bc/pom.xml
index 70f38afca1999..7f87fe122cd31 100644
--- a/pulsar-client-messagecrypto-bc/pom.xml
+++ b/pulsar-client-messagecrypto-bc/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml
index cf34e6f1d5091..689733d659447 100644
--- a/pulsar-client-shaded/pom.xml
+++ b/pulsar-client-shaded/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-client-tools-test/pom.xml b/pulsar-client-tools-test/pom.xml
index 85c532ac1c3bb..c723143917fee 100644
--- a/pulsar-client-tools-test/pom.xml
+++ b/pulsar-client-tools-test/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml
index 2ec40e7f7b1fd..068f8c744bf54 100644
--- a/pulsar-client-tools/pom.xml
+++ b/pulsar-client-tools/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index b2124d9293c8d..9b19613835d5d 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index 6d95bd5025867..2f3e48ce2be85 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-config-validation/pom.xml b/pulsar-config-validation/pom.xml
index 080b78095bccc..c02d88d4aa1af 100644
--- a/pulsar-config-validation/pom.xml
+++ b/pulsar-config-validation/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-functions/api-java/pom.xml b/pulsar-functions/api-java/pom.xml
index efa510d4d105e..a9406921b1b7f 100644
--- a/pulsar-functions/api-java/pom.xml
+++ b/pulsar-functions/api-java/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-functions
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-functions-api
diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml
index ae16da8e44bfb..ee8c5374bb182 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-functions
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-functions-instance
diff --git a/pulsar-functions/java-examples/pom.xml b/pulsar-functions/java-examples/pom.xml
index 9eeffc0186e84..98cd1114db58f 100644
--- a/pulsar-functions/java-examples/pom.xml
+++ b/pulsar-functions/java-examples/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-functions
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-functions-api-examples
diff --git a/pulsar-functions/localrun-shaded/pom.xml b/pulsar-functions/localrun-shaded/pom.xml
index 82a792459e259..baf2b72e00178 100644
--- a/pulsar-functions/localrun-shaded/pom.xml
+++ b/pulsar-functions/localrun-shaded/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar-functions
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-functions/localrun/pom.xml b/pulsar-functions/localrun/pom.xml
index b80e369820e77..81a2550d6f196 100644
--- a/pulsar-functions/localrun/pom.xml
+++ b/pulsar-functions/localrun/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar-functions
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-functions/pom.xml b/pulsar-functions/pom.xml
index 6ee467025e564..895ab9bef46d8 100644
--- a/pulsar-functions/pom.xml
+++ b/pulsar-functions/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-functions
diff --git a/pulsar-functions/proto/pom.xml b/pulsar-functions/proto/pom.xml
index 4aee26b900b7c..aeeb39a5c6183 100644
--- a/pulsar-functions/proto/pom.xml
+++ b/pulsar-functions/proto/pom.xml
@@ -27,7 +27,7 @@
org.apache.pulsar
pulsar-functions
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-functions-proto
diff --git a/pulsar-functions/runtime-all/pom.xml b/pulsar-functions/runtime-all/pom.xml
index 7544725021f0f..1045fadd29e5b 100644
--- a/pulsar-functions/runtime-all/pom.xml
+++ b/pulsar-functions/runtime-all/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar-functions
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-functions/runtime/pom.xml b/pulsar-functions/runtime/pom.xml
index 051a9aacd9ed6..0154e62233eac 100644
--- a/pulsar-functions/runtime/pom.xml
+++ b/pulsar-functions/runtime/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-functions
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-functions-runtime
diff --git a/pulsar-functions/secrets/pom.xml b/pulsar-functions/secrets/pom.xml
index 801a5b0848de4..4c2c201165dce 100644
--- a/pulsar-functions/secrets/pom.xml
+++ b/pulsar-functions/secrets/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-functions
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-functions-secrets
diff --git a/pulsar-functions/utils/pom.xml b/pulsar-functions/utils/pom.xml
index 0c08364a68d34..2a0d035a6cf0f 100644
--- a/pulsar-functions/utils/pom.xml
+++ b/pulsar-functions/utils/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-functions
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-functions-utils
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index 1476a8de4394a..f83b6b17f2c12 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-functions
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-io/aerospike/pom.xml b/pulsar-io/aerospike/pom.xml
index 549067c8ffe36..771c909507b43 100644
--- a/pulsar-io/aerospike/pom.xml
+++ b/pulsar-io/aerospike/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-aerospike
diff --git a/pulsar-io/aws/pom.xml b/pulsar-io/aws/pom.xml
index 724835d80f8ad..d2c42408cc2d2 100644
--- a/pulsar-io/aws/pom.xml
+++ b/pulsar-io/aws/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-aws
diff --git a/pulsar-io/batch-data-generator/pom.xml b/pulsar-io/batch-data-generator/pom.xml
index e4a8f3f8c6c15..bb4fa445bbca3 100644
--- a/pulsar-io/batch-data-generator/pom.xml
+++ b/pulsar-io/batch-data-generator/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-batch-data-generator
diff --git a/pulsar-io/batch-discovery-triggerers/pom.xml b/pulsar-io/batch-discovery-triggerers/pom.xml
index ef14e98973c8c..82672d464cd58 100644
--- a/pulsar-io/batch-discovery-triggerers/pom.xml
+++ b/pulsar-io/batch-discovery-triggerers/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-batch-discovery-triggerers
diff --git a/pulsar-io/canal/pom.xml b/pulsar-io/canal/pom.xml
index e61e8a0567753..bb41b142cf6c6 100644
--- a/pulsar-io/canal/pom.xml
+++ b/pulsar-io/canal/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
4.0.0
diff --git a/pulsar-io/cassandra/pom.xml b/pulsar-io/cassandra/pom.xml
index 8c796b5fb9b5e..7a8d5795bf790 100644
--- a/pulsar-io/cassandra/pom.xml
+++ b/pulsar-io/cassandra/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-cassandra
diff --git a/pulsar-io/common/pom.xml b/pulsar-io/common/pom.xml
index 8185bbcfca16f..3c9bb46d20231 100644
--- a/pulsar-io/common/pom.xml
+++ b/pulsar-io/common/pom.xml
@@ -27,7 +27,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-common
diff --git a/pulsar-io/core/pom.xml b/pulsar-io/core/pom.xml
index a5d13db1df6cf..d341e110927d0 100644
--- a/pulsar-io/core/pom.xml
+++ b/pulsar-io/core/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-core
diff --git a/pulsar-io/data-generator/pom.xml b/pulsar-io/data-generator/pom.xml
index d9445778bdd07..6a95d07e1b54d 100644
--- a/pulsar-io/data-generator/pom.xml
+++ b/pulsar-io/data-generator/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-data-generator
diff --git a/pulsar-io/debezium/core/pom.xml b/pulsar-io/debezium/core/pom.xml
index de45a6171edcb..46b7cdd592d80 100644
--- a/pulsar-io/debezium/core/pom.xml
+++ b/pulsar-io/debezium/core/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io-debezium
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-debezium-core
diff --git a/pulsar-io/debezium/mongodb/pom.xml b/pulsar-io/debezium/mongodb/pom.xml
index 0f9f5cc43d86a..0a01f218a41f3 100644
--- a/pulsar-io/debezium/mongodb/pom.xml
+++ b/pulsar-io/debezium/mongodb/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io-debezium
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-debezium-mongodb
diff --git a/pulsar-io/debezium/mssql/pom.xml b/pulsar-io/debezium/mssql/pom.xml
index e2d9bf0125de6..c076715fbc787 100644
--- a/pulsar-io/debezium/mssql/pom.xml
+++ b/pulsar-io/debezium/mssql/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io-debezium
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-debezium-mssql
diff --git a/pulsar-io/debezium/mysql/pom.xml b/pulsar-io/debezium/mysql/pom.xml
index 05a7410c0567d..03e396cfaa8f5 100644
--- a/pulsar-io/debezium/mysql/pom.xml
+++ b/pulsar-io/debezium/mysql/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io-debezium
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-debezium-mysql
diff --git a/pulsar-io/debezium/oracle/pom.xml b/pulsar-io/debezium/oracle/pom.xml
index d1da410bde301..d7fefcd5a0d6b 100644
--- a/pulsar-io/debezium/oracle/pom.xml
+++ b/pulsar-io/debezium/oracle/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io-debezium
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-debezium-oracle
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml
index 2f70452afb985..aa024b86b6521 100644
--- a/pulsar-io/debezium/pom.xml
+++ b/pulsar-io/debezium/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-debezium
diff --git a/pulsar-io/debezium/postgres/pom.xml b/pulsar-io/debezium/postgres/pom.xml
index d4c21e5e0cf7d..1645bdf1cf135 100644
--- a/pulsar-io/debezium/postgres/pom.xml
+++ b/pulsar-io/debezium/postgres/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io-debezium
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-debezium-postgres
diff --git a/pulsar-io/docs/pom.xml b/pulsar-io/docs/pom.xml
index 169f3bc6e0b11..13f5a001440a5 100644
--- a/pulsar-io/docs/pom.xml
+++ b/pulsar-io/docs/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-docs
diff --git a/pulsar-io/dynamodb/pom.xml b/pulsar-io/dynamodb/pom.xml
index b7610d30728e8..34c7f7e2961f1 100644
--- a/pulsar-io/dynamodb/pom.xml
+++ b/pulsar-io/dynamodb/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-dynamodb
diff --git a/pulsar-io/elastic-search/pom.xml b/pulsar-io/elastic-search/pom.xml
index 8a630fb071bef..4b00d709f1825 100644
--- a/pulsar-io/elastic-search/pom.xml
+++ b/pulsar-io/elastic-search/pom.xml
@@ -23,7 +23,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-elastic-search
Pulsar IO :: ElasticSearch
diff --git a/pulsar-io/file/pom.xml b/pulsar-io/file/pom.xml
index 5cf419d077558..1b78f1bc1988b 100644
--- a/pulsar-io/file/pom.xml
+++ b/pulsar-io/file/pom.xml
@@ -23,7 +23,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-file
diff --git a/pulsar-io/flume/pom.xml b/pulsar-io/flume/pom.xml
index d73430b5f38f7..7e62c299f01cc 100644
--- a/pulsar-io/flume/pom.xml
+++ b/pulsar-io/flume/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-flume
diff --git a/pulsar-io/hbase/pom.xml b/pulsar-io/hbase/pom.xml
index cefe293449f83..6e9a7dc2e1418 100644
--- a/pulsar-io/hbase/pom.xml
+++ b/pulsar-io/hbase/pom.xml
@@ -25,7 +25,7 @@
pulsar-io
org.apache.pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-hbase
Pulsar IO :: Hbase
diff --git a/pulsar-io/hdfs2/pom.xml b/pulsar-io/hdfs2/pom.xml
index 4a965f7ae6ede..29a52b17ff551 100644
--- a/pulsar-io/hdfs2/pom.xml
+++ b/pulsar-io/hdfs2/pom.xml
@@ -23,7 +23,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-hdfs2
Pulsar IO :: Hdfs2
diff --git a/pulsar-io/hdfs3/pom.xml b/pulsar-io/hdfs3/pom.xml
index 3eaaeb8203a2a..68327c7166a2a 100644
--- a/pulsar-io/hdfs3/pom.xml
+++ b/pulsar-io/hdfs3/pom.xml
@@ -23,7 +23,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-hdfs3
Pulsar IO :: Hdfs3
diff --git a/pulsar-io/influxdb/pom.xml b/pulsar-io/influxdb/pom.xml
index f4556e4702b1c..4a9ea205f6c00 100644
--- a/pulsar-io/influxdb/pom.xml
+++ b/pulsar-io/influxdb/pom.xml
@@ -25,7 +25,7 @@
pulsar-io
org.apache.pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-influxdb
diff --git a/pulsar-io/jdbc/clickhouse/pom.xml b/pulsar-io/jdbc/clickhouse/pom.xml
index 0d99d61555223..0745d9950a76c 100644
--- a/pulsar-io/jdbc/clickhouse/pom.xml
+++ b/pulsar-io/jdbc/clickhouse/pom.xml
@@ -24,7 +24,7 @@
pulsar-io-jdbc
org.apache.pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
4.0.0
diff --git a/pulsar-io/jdbc/core/pom.xml b/pulsar-io/jdbc/core/pom.xml
index 09ffe777288d7..c2170c174e918 100644
--- a/pulsar-io/jdbc/core/pom.xml
+++ b/pulsar-io/jdbc/core/pom.xml
@@ -24,7 +24,7 @@
pulsar-io-jdbc
org.apache.pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
4.0.0
diff --git a/pulsar-io/jdbc/mariadb/pom.xml b/pulsar-io/jdbc/mariadb/pom.xml
index 8b5a42990cf00..d909af3bf165c 100644
--- a/pulsar-io/jdbc/mariadb/pom.xml
+++ b/pulsar-io/jdbc/mariadb/pom.xml
@@ -24,7 +24,7 @@
pulsar-io-jdbc
org.apache.pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
4.0.0
diff --git a/pulsar-io/jdbc/pom.xml b/pulsar-io/jdbc/pom.xml
index 3d76adc5ae741..85e71ddd6c683 100644
--- a/pulsar-io/jdbc/pom.xml
+++ b/pulsar-io/jdbc/pom.xml
@@ -32,7 +32,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-jdbc
diff --git a/pulsar-io/jdbc/postgres/pom.xml b/pulsar-io/jdbc/postgres/pom.xml
index 701321d70b186..f9fe74a07ebfb 100644
--- a/pulsar-io/jdbc/postgres/pom.xml
+++ b/pulsar-io/jdbc/postgres/pom.xml
@@ -24,7 +24,7 @@
pulsar-io-jdbc
org.apache.pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
4.0.0
diff --git a/pulsar-io/jdbc/sqlite/pom.xml b/pulsar-io/jdbc/sqlite/pom.xml
index bd20a5c2fd4b9..e5fca91b6c582 100644
--- a/pulsar-io/jdbc/sqlite/pom.xml
+++ b/pulsar-io/jdbc/sqlite/pom.xml
@@ -24,7 +24,7 @@
pulsar-io-jdbc
org.apache.pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
4.0.0
pulsar-io-jdbc-sqlite
diff --git a/pulsar-io/kafka-connect-adaptor-nar/pom.xml b/pulsar-io/kafka-connect-adaptor-nar/pom.xml
index ef3460645df92..76d532dcdd0fd 100644
--- a/pulsar-io/kafka-connect-adaptor-nar/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor-nar/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-kafka-connect-adaptor-nar
diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml
index 5a03a21d85eef..94b3ac3a72c65 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-kafka-connect-adaptor
diff --git a/pulsar-io/kafka/pom.xml b/pulsar-io/kafka/pom.xml
index 4aef852521682..19c6518df68b2 100644
--- a/pulsar-io/kafka/pom.xml
+++ b/pulsar-io/kafka/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-kafka
diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/kinesis/pom.xml
index 7bc58f1ae81b1..74d0e57aedaf3 100644
--- a/pulsar-io/kinesis/pom.xml
+++ b/pulsar-io/kinesis/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-kinesis
diff --git a/pulsar-io/mongo/pom.xml b/pulsar-io/mongo/pom.xml
index 507e56723b5ee..5024127718bb3 100644
--- a/pulsar-io/mongo/pom.xml
+++ b/pulsar-io/mongo/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-mongo
diff --git a/pulsar-io/netty/pom.xml b/pulsar-io/netty/pom.xml
index e4d2b3d0ede76..40912b136225b 100644
--- a/pulsar-io/netty/pom.xml
+++ b/pulsar-io/netty/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-netty
diff --git a/pulsar-io/nsq/pom.xml b/pulsar-io/nsq/pom.xml
index 1a0ea5743bfdd..2a41f92dac4a4 100644
--- a/pulsar-io/nsq/pom.xml
+++ b/pulsar-io/nsq/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-nsq
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 6195debcb2e7b..f0dc1551f9a9e 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io
diff --git a/pulsar-io/rabbitmq/pom.xml b/pulsar-io/rabbitmq/pom.xml
index cb3875b5a339a..8397fc84b361a 100644
--- a/pulsar-io/rabbitmq/pom.xml
+++ b/pulsar-io/rabbitmq/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-rabbitmq
diff --git a/pulsar-io/redis/pom.xml b/pulsar-io/redis/pom.xml
index 4def845713231..64ba40fb3e685 100644
--- a/pulsar-io/redis/pom.xml
+++ b/pulsar-io/redis/pom.xml
@@ -25,7 +25,7 @@
pulsar-io
org.apache.pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-redis
diff --git a/pulsar-io/solr/pom.xml b/pulsar-io/solr/pom.xml
index f2298d54f7efa..ee5072ce2faec 100644
--- a/pulsar-io/solr/pom.xml
+++ b/pulsar-io/solr/pom.xml
@@ -25,7 +25,7 @@
pulsar-io
org.apache.pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
diff --git a/pulsar-io/twitter/pom.xml b/pulsar-io/twitter/pom.xml
index b568be8e7e1ea..9167b3a28f97e 100644
--- a/pulsar-io/twitter/pom.xml
+++ b/pulsar-io/twitter/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-io-twitter
diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index ec96ca7d0e2b0..3c5e0eb40ecd1 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-package-management/bookkeeper-storage/pom.xml b/pulsar-package-management/bookkeeper-storage/pom.xml
index 380a17f7e9e34..1af8f528816bc 100644
--- a/pulsar-package-management/bookkeeper-storage/pom.xml
+++ b/pulsar-package-management/bookkeeper-storage/pom.xml
@@ -25,7 +25,7 @@
pulsar-package-management
org.apache.pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
4.0.0
diff --git a/pulsar-package-management/core/pom.xml b/pulsar-package-management/core/pom.xml
index d40119d1a617a..749ec9313cb3d 100644
--- a/pulsar-package-management/core/pom.xml
+++ b/pulsar-package-management/core/pom.xml
@@ -25,7 +25,7 @@
pulsar-package-management
org.apache.pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
4.0.0
diff --git a/pulsar-package-management/filesystem-storage/pom.xml b/pulsar-package-management/filesystem-storage/pom.xml
index 365159477beec..3958ce4c313b2 100644
--- a/pulsar-package-management/filesystem-storage/pom.xml
+++ b/pulsar-package-management/filesystem-storage/pom.xml
@@ -25,7 +25,7 @@
pulsar-package-management
org.apache.pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
4.0.0
diff --git a/pulsar-package-management/pom.xml b/pulsar-package-management/pom.xml
index 9ffd136b6c6c0..5f91f6eb84ef9 100644
--- a/pulsar-package-management/pom.xml
+++ b/pulsar-package-management/pom.xml
@@ -25,7 +25,7 @@
pulsar
org.apache.pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
4.0.0
diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml
index 63906fb44dc27..cd408cde17020 100644
--- a/pulsar-proxy/pom.xml
+++ b/pulsar-proxy/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-proxy
diff --git a/pulsar-sql/java-version-trim-agent/pom.xml b/pulsar-sql/java-version-trim-agent/pom.xml
index 48c43e6dda279..cb2e6138c1ded 100644
--- a/pulsar-sql/java-version-trim-agent/pom.xml
+++ b/pulsar-sql/java-version-trim-agent/pom.xml
@@ -24,7 +24,7 @@
pulsar-sql
org.apache.pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
4.0.0
diff --git a/pulsar-sql/pom.xml b/pulsar-sql/pom.xml
index f5331dae23b94..69d313469939b 100644
--- a/pulsar-sql/pom.xml
+++ b/pulsar-sql/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-sql
diff --git a/pulsar-sql/presto-distribution/pom.xml b/pulsar-sql/presto-distribution/pom.xml
index 0c940e623b644..a45c4cf080682 100644
--- a/pulsar-sql/presto-distribution/pom.xml
+++ b/pulsar-sql/presto-distribution/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-sql
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-presto-distribution
diff --git a/pulsar-sql/presto-pulsar-plugin/pom.xml b/pulsar-sql/presto-pulsar-plugin/pom.xml
index 00057135aef52..6ec62caec43f8 100644
--- a/pulsar-sql/presto-pulsar-plugin/pom.xml
+++ b/pulsar-sql/presto-pulsar-plugin/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-sql
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-presto-connector
diff --git a/pulsar-sql/presto-pulsar/pom.xml b/pulsar-sql/presto-pulsar/pom.xml
index 3501466a82834..bb0e06d3f2e08 100644
--- a/pulsar-sql/presto-pulsar/pom.xml
+++ b/pulsar-sql/presto-pulsar/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-sql
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-presto-connector-original
diff --git a/pulsar-testclient/pom.xml b/pulsar-testclient/pom.xml
index 8b8633fb6c420..6433f9db5a176 100644
--- a/pulsar-testclient/pom.xml
+++ b/pulsar-testclient/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/pulsar-transaction/common/pom.xml b/pulsar-transaction/common/pom.xml
index 308275a05b7f8..25a01608f2ce5 100644
--- a/pulsar-transaction/common/pom.xml
+++ b/pulsar-transaction/common/pom.xml
@@ -27,7 +27,7 @@
org.apache.pulsar
pulsar-transaction-parent
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-transaction-common
diff --git a/pulsar-transaction/coordinator/pom.xml b/pulsar-transaction/coordinator/pom.xml
index 79961dadbda1f..7b5d45bf1e83a 100644
--- a/pulsar-transaction/coordinator/pom.xml
+++ b/pulsar-transaction/coordinator/pom.xml
@@ -27,7 +27,7 @@
org.apache.pulsar
pulsar-transaction-parent
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-transaction-coordinator
diff --git a/pulsar-transaction/pom.xml b/pulsar-transaction/pom.xml
index 5ba989961ce80..23ed756b1ffa2 100644
--- a/pulsar-transaction/pom.xml
+++ b/pulsar-transaction/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-transaction-parent
diff --git a/pulsar-websocket/pom.xml b/pulsar-websocket/pom.xml
index dd7fa68812d78..12282020f1fe0 100644
--- a/pulsar-websocket/pom.xml
+++ b/pulsar-websocket/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/structured-event-log/pom.xml b/structured-event-log/pom.xml
index 5d01dcca3d86e..ad12dc090defd 100644
--- a/structured-event-log/pom.xml
+++ b/structured-event-log/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/testmocks/pom.xml b/testmocks/pom.xml
index 86704930e1905..a9ebf86e78027 100644
--- a/testmocks/pom.xml
+++ b/testmocks/pom.xml
@@ -25,7 +25,7 @@
pulsar
org.apache.pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
testmocks
diff --git a/tests/bc_2_0_0/pom.xml b/tests/bc_2_0_0/pom.xml
index f919d8c2e40d7..74e6b29e4b1d6 100644
--- a/tests/bc_2_0_0/pom.xml
+++ b/tests/bc_2_0_0/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar.tests
tests-parent
- 2.10.6-SNAPSHOT
+ 2.10.6
bc_2_0_0
diff --git a/tests/bc_2_0_1/pom.xml b/tests/bc_2_0_1/pom.xml
index f64327cc32c76..8064d106e302f 100644
--- a/tests/bc_2_0_1/pom.xml
+++ b/tests/bc_2_0_1/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar.tests
tests-parent
- 2.10.6-SNAPSHOT
+ 2.10.6
bc_2_0_1
diff --git a/tests/bc_2_6_0/pom.xml b/tests/bc_2_6_0/pom.xml
index c02bbe5491fcf..92680683c9e0d 100644
--- a/tests/bc_2_6_0/pom.xml
+++ b/tests/bc_2_6_0/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar.tests
tests-parent
- 2.10.6-SNAPSHOT
+ 2.10.6
4.0.0
diff --git a/tests/docker-images/java-test-functions/pom.xml b/tests/docker-images/java-test-functions/pom.xml
index 8173511dc8fc8..b385c4ab7c521 100644
--- a/tests/docker-images/java-test-functions/pom.xml
+++ b/tests/docker-images/java-test-functions/pom.xml
@@ -23,7 +23,7 @@
org.apache.pulsar.tests
docker-images
- 2.10.6-SNAPSHOT
+ 2.10.6
4.0.0
java-test-functions
diff --git a/tests/docker-images/java-test-image/pom.xml b/tests/docker-images/java-test-image/pom.xml
index 4e4e901c40689..ee5d46d41294e 100644
--- a/tests/docker-images/java-test-image/pom.xml
+++ b/tests/docker-images/java-test-image/pom.xml
@@ -23,7 +23,7 @@
org.apache.pulsar.tests
docker-images
- 2.10.6-SNAPSHOT
+ 2.10.6
4.0.0
java-test-image
diff --git a/tests/docker-images/latest-version-image/pom.xml b/tests/docker-images/latest-version-image/pom.xml
index dbcb967073740..feab4c22c37f9 100644
--- a/tests/docker-images/latest-version-image/pom.xml
+++ b/tests/docker-images/latest-version-image/pom.xml
@@ -23,7 +23,7 @@
org.apache.pulsar.tests
docker-images
- 2.10.6-SNAPSHOT
+ 2.10.6
4.0.0
latest-version-image
diff --git a/tests/docker-images/pom.xml b/tests/docker-images/pom.xml
index 90f71564eba32..78d96ebd53d75 100644
--- a/tests/docker-images/pom.xml
+++ b/tests/docker-images/pom.xml
@@ -27,7 +27,7 @@
org.apache.pulsar.tests
tests-parent
- 2.10.6-SNAPSHOT
+ 2.10.6
docker-images
Apache Pulsar :: Tests :: Docker Images
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 5971840bb04c2..f4ee351072f05 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar.tests
tests-parent
- 2.10.6-SNAPSHOT
+ 2.10.6
integration
diff --git a/tests/pom.xml b/tests/pom.xml
index 4cdeb4753c456..aaf1eedfd1902 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
org.apache.pulsar.tests
tests-parent
diff --git a/tests/pulsar-client-admin-shade-test/pom.xml b/tests/pulsar-client-admin-shade-test/pom.xml
index 4dace92c27303..828c0e554edd7 100644
--- a/tests/pulsar-client-admin-shade-test/pom.xml
+++ b/tests/pulsar-client-admin-shade-test/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar.tests
tests-parent
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-client-admin-shade-test
diff --git a/tests/pulsar-client-all-shade-test/pom.xml b/tests/pulsar-client-all-shade-test/pom.xml
index 0fab55fe122a7..9af89ea26e7d9 100644
--- a/tests/pulsar-client-all-shade-test/pom.xml
+++ b/tests/pulsar-client-all-shade-test/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar.tests
tests-parent
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-client-all-shade-test
diff --git a/tests/pulsar-client-shade-test/pom.xml b/tests/pulsar-client-shade-test/pom.xml
index 5922f31395524..50d7ca86e9955 100644
--- a/tests/pulsar-client-shade-test/pom.xml
+++ b/tests/pulsar-client-shade-test/pom.xml
@@ -27,7 +27,7 @@
org.apache.pulsar.tests
tests-parent
- 2.10.6-SNAPSHOT
+ 2.10.6
pulsar-client-shade-test
diff --git a/tiered-storage/file-system/pom.xml b/tiered-storage/file-system/pom.xml
index 664e4b289b410..73ba9a72dc01e 100644
--- a/tiered-storage/file-system/pom.xml
+++ b/tiered-storage/file-system/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
tiered-storage-parent
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/tiered-storage/jcloud/pom.xml b/tiered-storage/jcloud/pom.xml
index 1d5b6489dc93e..e191e123b8bdf 100644
--- a/tiered-storage/jcloud/pom.xml
+++ b/tiered-storage/jcloud/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
tiered-storage-parent
- 2.10.6-SNAPSHOT
+ 2.10.6
..
diff --git a/tiered-storage/pom.xml b/tiered-storage/pom.xml
index a2447cf6830f7..6e4a8dc2f2b4d 100644
--- a/tiered-storage/pom.xml
+++ b/tiered-storage/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.10.6-SNAPSHOT
+ 2.10.6
..