Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] make some methods async in Namespaces #16814

Merged
merged 1 commit into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1293,13 +1293,6 @@ protected CompletableFuture<Void> internalRemovePublishRateAsync() {
}));
}

protected PublishRate internalGetPublishRate() {
validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);

Policies policies = getNamespacePolicies(namespaceName);
return policies.publishMaxMessageRate.get(pulsar().getConfiguration().getClusterName());
}

protected CompletableFuture<PublishRate> internalGetPublishRateAsync() {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
Expand Down Expand Up @@ -1365,14 +1358,6 @@ protected CompletableFuture<Void> internalDeleteTopicDispatchRateAsync() {
}));
}

@SuppressWarnings("deprecation")
protected DispatchRate internalGetTopicDispatchRate() {
validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);

Policies policies = getNamespacePolicies(namespaceName);
return policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName());
}

@SuppressWarnings("deprecation")
protected CompletableFuture<DispatchRate> internalGetTopicDispatchRateAsync() {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.READ)
Expand Down Expand Up @@ -1825,11 +1810,6 @@ protected Boolean internalGetEncryptionRequired() {
return policies.encryption_required;
}

protected DelayedDeliveryPolicies internalGetDelayedDelivery() {
validateNamespacePolicyOperation(namespaceName, PolicyName.DELAYED_DELIVERY, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).delayed_delivery_policies;
}

protected void internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolicies) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
Expand Down Expand Up @@ -1957,13 +1937,6 @@ protected List<String> internalGetAntiAffinityNamespaces(String cluster, String
}
}

protected RetentionPolicies internalGetRetention() {
validateNamespacePolicyOperation(namespaceName, PolicyName.RETENTION, PolicyOperation.READ);

Policies policies = getNamespacePolicies(namespaceName);
return policies.retention_policies;
}

private boolean checkQuotas(Policies policies, RetentionPolicies retention) {
Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlogQuotaMap = policies.backlog_quota_map;
if (backlogQuotaMap.isEmpty()) {
Expand Down Expand Up @@ -2244,11 +2217,6 @@ protected void internalSetMaxConsumersPerSubscription(Integer maxConsumersPerSub
}
}

protected Integer internalGetMaxUnackedMessagesPerConsumer() {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_unacked_messages_per_consumer;
}

protected void internalSetMaxUnackedMessagesPerConsumer(Integer maxUnackedMessagesPerConsumer) {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
Expand All @@ -2273,16 +2241,6 @@ protected void internalSetMaxUnackedMessagesPerConsumer(Integer maxUnackedMessag
}
}

protected Integer internalGetMaxUnackedMessagesPerSubscription() {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_unacked_messages_per_subscription;
}

protected Integer internalGetMaxSubscriptionsPerTopic() {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_subscriptions_per_topic;
}

protected void internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic){
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
Expand Down Expand Up @@ -2399,14 +2357,6 @@ protected SchemaAutoUpdateCompatibilityStrategy internalGetSchemaAutoUpdateCompa
return getNamespacePolicies(namespaceName).schema_auto_update_compatibility_strategy;
}

protected SchemaCompatibilityStrategy internalGetSchemaCompatibilityStrategy() {
validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);

return policies.schema_compatibility_strategy;
}

@Deprecated
protected void internalSetSchemaAutoUpdateCompatibilityStrategy(SchemaAutoUpdateCompatibilityStrategy strategy) {
validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
Expand Down Expand Up @@ -2615,12 +2565,6 @@ private void validateOffloadPolicies(OffloadPoliciesImpl offloadPolicies) {
}
}

protected int internalGetMaxTopicsPerNamespace() {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_topics_per_namespace != null
? getNamespacePolicies(namespaceName).max_topics_per_namespace : 0;
}

protected void internalRemoveMaxTopicsPerNamespace() {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, PolicyOperation.WRITE);
internalSetMaxTopicsPerNamespace(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
Expand Down Expand Up @@ -1017,10 +1016,21 @@ public void setDispatchRate(@Suspended AsyncResponse asyncResponse,
+ "-1 means msg-dispatch-rate or byte-dispatch-rate not configured in dispatch-rate yet")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist")})
public DispatchRate getDispatchRate(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
public void getDispatchRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
return internalGetTopicDispatchRate();
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(
policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName())))
.exceptionally(ex -> {
log.error("[{}] Failed to get dispatch-rate configured for the namespace {}", clientAppId(),
namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand Down Expand Up @@ -1177,10 +1187,20 @@ public void removeBacklogQuota(@PathParam("property") String property, @PathPara
@ApiOperation(hidden = true, value = "Get retention config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public RetentionPolicies getRetention(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
public void getRetention(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
return internalGetRetention();
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RETENTION, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.retention_policies))
.exceptionally(ex -> {
log.error("[{}] Failed to get retention config on a namespace {}", clientAppId(), namespaceName,
ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand Down
Loading