Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] Update topic policies as much as possible when some ex was thrown #21810

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1227,7 +1227,7 @@ protected boolean isExceedMaximumMessageSize(int size, PublishContext publishCon
/**
* update topic publish dispatcher for this topic.
*/
public void updatePublishDispatcher() {
public void updatePublishRateLimiter() {
PublishRate publishRate = topicPolicies.getPublishRate().get();
if (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0) {
log.info("Enabling publish rate limiting {} on topic {}", publishRate, getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2706,7 +2706,7 @@ private void updateMaxPublishRatePerTopicInMessages() {
forEachTopic(topic -> {
if (topic instanceof AbstractTopic) {
((AbstractTopic) topic).updateBrokerPublishRate();
((AbstractTopic) topic).updatePublishDispatcher();
((AbstractTopic) topic).updatePublishRateLimiter();
}
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public CompletableFuture<Void> initialize() {
isEncryptionRequired = policies.encryption_required;
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
}
updatePublishDispatcher();
updatePublishRateLimiter();
updateResourceGroupLimiter(policies);
return updateClusterMigrated();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public CompletableFuture<Void> initialize() {
.thenAcceptAsync(optPolicies -> {
if (!optPolicies.isPresent()) {
isEncryptionRequired = false;
updatePublishDispatcher();
updatePublishRateLimiter();
updateResourceGroupLimiter(new Policies());
initializeDispatchRateLimiterIfNeeded();
updateSubscribeRateLimiter();
Expand All @@ -371,7 +371,7 @@ public CompletableFuture<Void> initialize() {

updateSubscribeRateLimiter();

updatePublishDispatcher();
updatePublishRateLimiter();

updateResourceGroupLimiter(policies);

Expand Down Expand Up @@ -3061,39 +3061,60 @@ public CompletableFuture<Void> onPoliciesUpdate(@Nonnull Policies data) {
return CompletableFuture.completedFuture(null);
}

// Update props.
// The component "EntryFilters" is update in the method "updateTopicPolicyByNamespacePolicy(data)".
// see more detail: https://github.com/apache/pulsar/pull/19364.
updateTopicPolicyByNamespacePolicy(data);
checkReplicatedSubscriptionControllerState();
isEncryptionRequired = data.encryption_required;

isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;

updateDispatchRateLimiter();

updateSubscribeRateLimiter();
// Apply policies for components.
List<CompletableFuture<Void>> applyPolicyTasks = applyUpdatedTopicPolicies();
applyPolicyTasks.add(applyUpdatedNamespacePolicies(data));
return FutureUtil.waitForAll(applyPolicyTasks)
.thenAccept(__ -> log.info("[{}] namespace-level policies updated successfully", topic))
.exceptionally(ex -> {
log.error("[{}] update namespace polices : {} error", this.getName(), data, ex);
throw FutureUtil.wrapToCompletionException(ex);
});
}

updatePublishDispatcher();
private CompletableFuture<Void> applyUpdatedNamespacePolicies(Policies namespaceLevelPolicies) {
return FutureUtil.runWithCurrentThread(() -> updateResourceGroupLimiter(namespaceLevelPolicies));
}

updateResourceGroupLimiter(data);
private List<CompletableFuture<Void>> applyUpdatedTopicPolicies() {
List<CompletableFuture<Void>> applyPoliciesFutureList = new ArrayList<>();

List<CompletableFuture<Void>> producerCheckFutures = new ArrayList<>(producers.size());
producers.values().forEach(producer -> producerCheckFutures.add(
// Client permission check.
subscriptions.forEach((subName, sub) -> {
sub.getConsumers().forEach(consumer -> applyPoliciesFutureList.add(consumer.checkPermissionsAsync()));
});
producers.values().forEach(producer -> applyPoliciesFutureList.add(
producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
// Check message expiry.
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> checkMessageExpiry()));

return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) -> {
return updateSubscriptionsDispatcherRateLimiter().thenCompose((___) -> {
replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter());
checkMessageExpiry();
CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies();
return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture,
preCreateSubscriptionForCompactionIfNeeded());
});
}).exceptionally(ex -> {
log.error("[{}] update namespace polices : {} error", this.getName(), data, ex);
throw FutureUtil.wrapToCompletionException(ex);
});
// Update rate limiters.
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateDispatchRateLimiter()));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateSubscribeRateLimiter()));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updatePublishRateLimiter()));

applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateSubscriptionsDispatcherRateLimiter()));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(
() -> replicators.forEach((name, replicator) -> replicator.updateRateLimiter())));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(
() -> shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter())));

// Other components.
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> checkReplicationAndRetryOnFailure()));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> checkDeduplicationStatus()));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> checkPersistencePolicies()));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(
() -> preCreateSubscriptionForCompactionIfNeeded()));

return applyPoliciesFutureList;
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down Expand Up @@ -3736,42 +3757,30 @@ public void onUpdate(TopicPolicies policies) {
if (policies == null) {
return;
}
// Update props.
// The component "EntryFilters" is update in the method "updateTopicPolicy(data)".
// see more detail: https://github.com/apache/pulsar/pull/19364.
updateTopicPolicy(policies);
shadowTopics = policies.getShadowTopics();
updateDispatchRateLimiter();
checkReplicatedSubscriptionControllerState();
updateSubscriptionsDispatcherRateLimiter().thenRun(() -> {
updatePublishDispatcher();
updateSubscribeRateLimiter();
replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter());
checkMessageExpiry();
})
.thenCompose(__ -> checkReplicationAndRetryOnFailure())
.thenCompose(__ -> checkDeduplicationStatus())
.thenCompose(__ -> preCreateSubscriptionForCompactionIfNeeded())
.thenCompose(__ -> checkPersistencePolicies())
.thenAccept(__ -> log.info("[{}] Policies updated successfully", topic))
.exceptionally(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
log.error("[{}] update topic policy error: {}", topic, t.getMessage(), t);
return null;
});

// Apply policies for components(not contains the specified policies which only defined in namespace policies).
FutureUtil.waitForAll(applyUpdatedTopicPolicies())
.thenAccept(__ -> log.info("[{}] topic-level policies updated successfully", topic))
.exceptionally(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
log.error("[{}] update topic-level policy error: {}", topic, t.getMessage(), t);
return null;
});
}

private CompletableFuture<Void> updateSubscriptionsDispatcherRateLimiter() {
List<CompletableFuture<Void>> subscriptionCheckFutures = new ArrayList<>((int) subscriptions.size());
private void updateSubscriptionsDispatcherRateLimiter() {
subscriptions.forEach((subName, sub) -> {
List<CompletableFuture<Void>> consumerCheckFutures = new ArrayList<>(sub.getConsumers().size());
sub.getConsumers().forEach(consumer -> consumerCheckFutures.add(consumer.checkPermissionsAsync()));
subscriptionCheckFutures.add(FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> {
Dispatcher dispatcher = sub.getDispatcher();
if (dispatcher != null) {
dispatcher.updateRateLimiter();
}
}));
Dispatcher dispatcher = sub.getDispatcher();
if (dispatcher != null) {
dispatcher.updateRateLimiter();
}
});
return FutureUtil.waitForAll(subscriptionCheckFutures);
}

protected CompletableFuture<Void> initTopicPolicy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.NamespaceService;
Expand All @@ -50,6 +52,7 @@
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -83,8 +86,11 @@
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -3157,4 +3163,49 @@ public void testProduceChangesWithEncryptionRequired() throws Exception {
});
}

@Test
public void testUpdateRetentionWithPartialFailure() throws Exception {
String tpName = BrokerTestUtil.newUniqueName("persistent://" + myNamespace + "/tp");
admin.topics().createNonPartitionedTopic(tpName);

// Load topic up.
admin.topics().getInternalStats(tpName);

// Inject an error that makes dispatch rate update fail.
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(tpName, false).join().get();
ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions =
WhiteboxImpl.getInternalState(persistentTopic, "subscriptions");
PersistentSubscription mockedSubscription = Mockito.mock(PersistentSubscription.class);
Mockito.when(mockedSubscription.getDispatcher()).thenThrow(new RuntimeException("Mocked error: getDispatcher"));
subscriptions.put("mockedSubscription", mockedSubscription);

// Update namespace-level retention policies.
RetentionPolicies retentionPolicies1 = new RetentionPolicies(1, 1);
admin.namespaces().setRetentionAsync(myNamespace, retentionPolicies1);

// Verify: update retention will be success even if other component update throws exception.
Awaitility.await().untilAsserted(() -> {
ManagedLedgerImpl ML = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
assertEquals(ML.getConfig().getRetentionSizeInMB(), 1);
assertEquals(ML.getConfig().getRetentionTimeMillis(), 1 * 60 * 1000);
});

// Update topic-level retention policies.
RetentionPolicies retentionPolicies2 = new RetentionPolicies(2, 2);
admin.topics().setRetentionAsync(tpName, retentionPolicies2);

// Verify: update retention will be success even if other component update throws exception.
Awaitility.await().untilAsserted(() -> {
ManagedLedgerImpl ML = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
assertEquals(ML.getConfig().getRetentionSizeInMB(), 2);
assertEquals(ML.getConfig().getRetentionTimeMillis(), 2 * 60 * 1000);
});

// Cleanup.
subscriptions.clear();
admin.namespaces().removeRetention(myNamespace);
admin.topics().delete(tpName, false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.common.util;

import com.google.common.util.concurrent.MoreExecutors;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -55,6 +56,11 @@ public static CompletableFuture<Void> waitForAll(Collection<? extends Completabl
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

public static CompletableFuture<Void> runWithCurrentThread(Runnable runnable) {
return CompletableFuture.runAsync(
() -> runnable.run(), MoreExecutors.directExecutor());
}

public static <T> CompletableFuture<List<T>> waitForAll(Stream<CompletableFuture<List<T>>> futures) {
return futures.reduce(CompletableFuture.completedFuture(new ArrayList<>()),
(pre, curr) -> pre.thenCompose(preV -> curr.thenApply(currV -> {
Expand Down
Loading