Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid potentially blocking calls to metadata on critical threads #12339

Merged
merged 6 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import com.google.common.base.Function;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
Expand All @@ -46,6 +47,7 @@
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -322,48 +324,43 @@ public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName n
return updateSubscriptionPermissionAsync(namespace, subscriptionName, Collections.singleton(role), true);
}

private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, Set<String> roles,
boolean remove) {
CompletableFuture<Void> result = new CompletableFuture<>();

private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName,
Set<String> roles,
boolean remove) {
try {
validatePoliciesReadOnlyAccess();
} catch (Exception e) {
result.completeExceptionally(e);
return FutureUtil.failedFuture(e);
}

try {
Policies policies = pulsarResources.getNamespaceResources().getPolicies(namespace)
.orElseThrow(() -> new NotFoundException(namespace + " not found"));
if (remove) {
if (policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName) != null) {
policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName).removeAll(roles);
}else {
log.info("[{}] Couldn't find role {} while revoking for sub = {}", namespace, subscriptionName, roles);
result.completeExceptionally(new IllegalArgumentException("couldn't find subscription"));
return result;
}
} else {
policies.auth_policies.getSubscriptionAuthentication().put(subscriptionName, roles);
}
pulsarResources.getNamespaceResources().setPolicies(namespace, (data)->policies);
CompletableFuture<Void> future =
pulsarResources.getNamespaceResources().setPoliciesAsync(namespace, policies -> {
if (remove) {
Set<String> subscriptionAuth =
policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName);
if (subscriptionAuth != null) {
subscriptionAuth.removeAll(roles);
} else {
log.info("[{}] Couldn't find role {} while revoking for sub = {}", namespace,
merlimat marked this conversation as resolved.
Show resolved Hide resolved
roles, subscriptionName);
throw new IllegalArgumentException("couldn't find subscription");
}
} else {
policies.auth_policies.getSubscriptionAuthentication().put(subscriptionName, roles);
}
return policies;
}).thenRun(() -> {
log.info("[{}] Successfully granted access for role {} for sub = {}", namespace, subscriptionName,
roles);
});

log.info("[{}] Successfully granted access for role {} for sub = {}", namespace, subscriptionName, roles);
result.complete(null);
} catch (NotFoundException e) {
log.warn("[{}] Failed to set permissions for namespace {}: does not exist", subscriptionName, namespace);
result.completeExceptionally(new IllegalArgumentException("Namespace does not exist" + namespace));
} catch (BadVersionException e) {
log.warn("[{}] Failed to set permissions for {} on namespace {}: concurrent modification", subscriptionName, roles, namespace);
result.completeExceptionally(new IllegalStateException(
"Concurrent modification on metadata path: " + namespace + ", " + e.getMessage()));
} catch (Exception e) {
log.error("[{}] Failed to get permissions for role {} on namespace {}", subscriptionName, roles, namespace, e);
result.completeExceptionally(
new IllegalStateException("Failed to get permissions for namespace " + namespace));
}
future.exceptionally(ex -> {
log.error("[{}] Failed to get permissions for role {} on namespace {}", subscriptionName, roles, namespace,
ex);
return null;
});

return result;
return future;
}

private CompletableFuture<Boolean> checkAuthorization(TopicName topicName, String role, AuthAction action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,31 +146,18 @@ public AbstractTopic(String topic, BrokerService brokerService) {
this.topicMaxMessageSizeCheckIntervalMs = TimeUnit.SECONDS.toMillis(brokerService.pulsar().getConfiguration()
.getMaxMessageSizeCheckIntervalInSeconds());
this.lastActive = System.nanoTime();
Policies policies = null;
try {
policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(
TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
}
this.preciseTopicPublishRateLimitingEnable =
brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
updatePublishDispatcher(policies);
updatePublishDispatcher();
}

protected boolean isProducersExceeded() {
Integer maxProducers = getTopicPolicies().map(TopicPolicies::getMaxProducerPerTopic).orElse(null);

if (maxProducers == null) {
Policies policies;
try {
policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(
TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());
} catch (Exception e) {
policies = new Policies();
}
Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());
maxProducers = policies.max_producers_per_topic;
}
maxProducers = maxProducers != null ? maxProducers : brokerService.pulsar()
Expand Down Expand Up @@ -208,21 +195,12 @@ public int getNumberOfSameAddressProducers(final String clientAddress) {
protected boolean isConsumersExceededOnTopic() {
Integer maxConsumers = getTopicPolicies().map(TopicPolicies::getMaxConsumerPerTopic).orElse(null);
if (maxConsumers == null) {
Policies policies;
try {
// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks
policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(
TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());

if (policies == null) {
policies = new Policies();
}
} catch (Exception e) {
log.warn("[{}] Failed to get namespace policies that include max number of consumers: {}", topic,
e.getMessage());
policies = new Policies();
}
// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks
Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(
TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());

maxConsumers = policies.max_consumers_per_topic;
}
final int maxConsumersPerTopic = maxConsumers != null ? maxConsumers
Expand Down Expand Up @@ -789,10 +767,10 @@ public PublishRateLimiter getBrokerPublishRateLimiter() {
}

public void updateMaxPublishRate(Policies policies) {
updatePublishDispatcher(policies);
updatePublishDispatcher();
}

private void updatePublishDispatcher(Policies policies) {
private void updatePublishDispatcher() {
//if topic-level policy exists, try to use topic-level publish rate policy
Optional<PublishRate> topicPublishRate = getTopicPolicies().map(TopicPolicies::getPublishRate);
if (topicPublishRate.isPresent()) {
Expand All @@ -802,9 +780,13 @@ private void updatePublishDispatcher(Policies policies) {
return;
}

Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(
TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());

//topic-level policy is not set, try to use namespace-level rate policy
final String clusterName = brokerService.pulsar().getConfiguration().getClusterName();
final PublishRate publishRate = policies != null && policies.publishMaxMessageRate != null
final PublishRate publishRate = policies.publishMaxMessageRate != null
? policies.publishMaxMessageRate.get(clusterName)
: null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2535,18 +2535,12 @@ public int getDefaultNumPartitions(final TopicName topicName) {
}

private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) {
try {
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources().getPolicies(topicName.getNamespaceObject());
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) {
return policies.get().autoTopicCreationOverride;
}
} catch (Throwable t) {
// Ignoring since if we don't have policies, we fallback on the default
log.warn("Got exception when reading autoTopicCreateOverride policy for {}: {};",
topicName, t.getMessage(), t);
return null;
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesIfCached(topicName.getNamespaceObject());
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) {
return policies.get().autoTopicCreationOverride;
}
log.debug("No autoTopicCreateOverride policy found for {}", topicName);
return null;
Expand All @@ -2568,18 +2562,11 @@ public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) {
}

private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) {
try {
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources().getPolicies(topicName.getNamespaceObject());
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) {
return policies.get().autoSubscriptionCreationOverride;
}
} catch (Throwable t) {
// Ignoring since if we don't have policies, we fallback on the default
log.warn("Got exception when reading autoSubscriptionCreateOverride policy for {}: {};",
topicName, t.getMessage(), t);
return null;
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources().getPoliciesIfCached(topicName.getNamespaceObject());
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) {
return policies.get().autoSubscriptionCreationOverride;
}
log.debug("No autoSubscriptionCreateOverride policy found for {}", topicName);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,7 @@ public DispatchRate getPoliciesDispatchRate(BrokerService brokerService) {

public static Optional<Policies> getPolicies(BrokerService brokerService, String topicName) {
final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject();
try {
return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(namespace);
} catch (Exception e) {
log.warn("Failed to get message-rate for {} ", topicName, e);
return Optional.empty();
}
return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(namespace);
}

/**
Expand Down