Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[broker]Optimize topicMaxMessageSize with topic local cache. #12830

Merged
merged 2 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -992,12 +992,6 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int retentionCheckIntervalInSeconds = 120;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Check between intervals to see if max message size of topic policy has updated. default is 60s"
)
private int maxMessageSizeCheckIntervalInSeconds = 60;

@FieldContext(
category = CATEGORY_SERVER,
doc = "The number of partitions per partitioned topic.\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractTopic implements Topic {
public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicPolicies> {

protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;

Expand Down Expand Up @@ -131,10 +131,6 @@ public abstract class AbstractTopic implements Topic {
AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "usageCount");
private volatile long usageCount = 0;

private volatile int topicMaxMessageSize = 0;
private volatile long lastTopicMaxMessageSizeCheckTimeStamp = 0;
private final long topicMaxMessageSizeCheckIntervalMs;

public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
this.brokerService = brokerService;
Expand All @@ -146,14 +142,20 @@ public AbstractTopic(String topic, BrokerService brokerService) {
topicPolicies = new HierarchyTopicPolicies();
updateTopicPolicyByBrokerConfig(topicPolicies, brokerService);

this.topicMaxMessageSizeCheckIntervalMs = TimeUnit.SECONDS.toMillis(
config.getMaxMessageSizeCheckIntervalInSeconds());

this.lastActive = System.nanoTime();
this.preciseTopicPublishRateLimitingEnable = config.isPreciseTopicPublishRateLimiterEnable();
updatePublishDispatcher(Optional.empty());
}

protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic());
topicPolicies.getInactiveTopicPolicies().updateTopicValue(data.getInactiveTopicPolicies());
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(type ->
this.topicPolicies.getBackLogQuotaMap().get(type).updateTopicValue(
data.getBackLogQuotaMap() == null ? null : data.getBackLogQuotaMap().get(type.toString())));
topicPolicies.getTopicMaxMessageSize().updateTopicValue(data.getMaxMessageSize());
}

protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
if (log.isDebugEnabled()) {
log.debug("[{}]updateTopicPolicyByNamespacePolicy,data={}", topic, namespacePolicies);
Expand Down Expand Up @@ -184,6 +186,8 @@ private void updateTopicPolicyByBrokerConfig(HierarchyTopicPolicies topicPolicie
topicPolicies.getBackLogQuotaMap()
.get(BacklogQuota.BacklogQuotaType.message_age)
.updateBrokerValue(brokerService.getBacklogQuotaManager().getDefaultQuota());

topicPolicies.getTopicMaxMessageSize().updateBrokerValue(config.getMaxMessageSize());
}

protected boolean isProducersExceeded() {
Expand All @@ -203,6 +207,22 @@ protected boolean isProducersExceeded() {
return false;
}

protected void registerTopicPolicyListener() {
if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
&& brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
brokerService.getPulsar().getTopicPoliciesService()
.registerListener(TopicName.getPartitionedTopicName(topic), this);
}
}

protected void unregisterTopicPolicyListener() {
if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
&& brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
brokerService.getPulsar().getTopicPoliciesService()
.unregisterListener(TopicName.getPartitionedTopicName(topic), this);
}
}

protected boolean isSameAddressProducersExceeded(Producer producer) {
final int maxSameAddressProducers = brokerService.pulsar().getConfiguration()
.getMaxSameAddressProducersPerTopic();
Expand Down Expand Up @@ -928,13 +948,13 @@ protected int getWaitingProducersCount() {
}

protected boolean isExceedMaximumMessageSize(int size) {
if (lastTopicMaxMessageSizeCheckTimeStamp + topicMaxMessageSizeCheckIntervalMs < System.currentTimeMillis()) {
// refresh topicMaxMessageSize from topic policies
topicMaxMessageSize = getTopicPolicies().map(TopicPolicies::getMaxMessageSize).orElse(0);
lastTopicMaxMessageSizeCheckTimeStamp = System.currentTimeMillis();
int topicMaxMessageSize = topicPolicies.getTopicMaxMessageSize().get();
if (topicMaxMessageSize <= 0) {
//invalid setting means this check is disabled.
return false;
}

if (topicMaxMessageSize == 0) {
if (topicMaxMessageSize >= brokerService.pulsar().getConfiguration().getMaxMessageSize()) {
//broker setting does not contain message header and already handled in client and frameDecoder.
return false;
}
return size > topicMaxMessageSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.SubscriptionOption;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPolicyListener;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
Expand All @@ -72,6 +73,7 @@
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl;
Expand All @@ -88,7 +90,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NonPersistentTopic extends AbstractTopic implements Topic {
public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPolicyListener<TopicPolicies> {

// Subscriptions to this topic
private final ConcurrentOpenHashMap<String, NonPersistentSubscription> subscriptions;
Expand Down Expand Up @@ -139,6 +141,7 @@ public NonPersistentTopic(String topic, BrokerService brokerService) {
this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
this.replicators = new ConcurrentOpenHashMap<>(16, 1);
this.isFenced = false;
registerTopicPolicyListener();
}

public CompletableFuture<Void> initialize() {
Expand Down Expand Up @@ -423,6 +426,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c
// deadlock. so, execute it in different thread
brokerService.executor().execute(() -> {
brokerService.removeTopicFromCache(topic);
unregisterTopicPolicyListener();
log.info("[{}] Topic deleted", topic);
deleteFuture.complete(null);
});
Expand Down Expand Up @@ -493,6 +497,7 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
// so, execute it in different thread
brokerService.executor().execute(() -> {
brokerService.removeTopicFromCache(topic);
unregisterTopicPolicyListener();
closeFuture.complete(null);
});
}).exceptionally(exception -> {
Expand Down Expand Up @@ -1005,6 +1010,14 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
return checkReplicationAndRetryOnFailure();
}

@Override
public void onUpdate(TopicPolicies data) {
if (data == null) {
return;
}
updateTopicPolicy(data);
}

/**
*
* @return Backlog quota for topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.netty.util.concurrent.FastThreadLocal;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -103,7 +102,6 @@
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.SubscriptionOption;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPolicyListener;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
Expand Down Expand Up @@ -164,8 +162,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCallback, TopicPolicyListener<TopicPolicies> {
public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback {

// Managed ledger associated with the topic
protected final ManagedLedger ledger;
Expand Down Expand Up @@ -3097,6 +3094,8 @@ public void onUpdate(TopicPolicies policies) {
if (policies == null) {
return;
}
updateTopicPolicy(policies);

Optional<Policies> namespacePolicies = getNamespacePolicies();
initializeTopicDispatchRateLimiterIfNeeded(policies);

Expand All @@ -3122,15 +3121,6 @@ public void onUpdate(TopicPolicies policies) {
updateMaxPublishRate(namespacePolicies.orElse(null));
}

topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(policies.getMaxSubscriptionsPerTopic());
topicPolicies.getInactiveTopicPolicies().updateTopicValue(policies.getInactiveTopicPolicies());
Arrays.stream(BacklogQuotaType.values()).forEach(type ->
this.topicPolicies.getBackLogQuotaMap().get(type).updateTopicValue(
policies.getBackLogQuotaMap() == null ? null :
policies.getBackLogQuotaMap().get(type.toString()))
);


updateUnackedMessagesAppliedOnSubscription(namespacePolicies.orElse(null));
initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
if (this.subscribeRateLimiter.isPresent()) {
Expand Down Expand Up @@ -3188,22 +3178,6 @@ protected CompletableFuture<Void> initTopicPolicy() {
return CompletableFuture.completedFuture(null);
}

private void registerTopicPolicyListener() {
if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
&& brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
brokerService.getPulsar().getTopicPoliciesService()
.registerListener(TopicName.getPartitionedTopicName(topic), this);
}
}

private void unregisterTopicPolicyListener() {
if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
&& brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
brokerService.getPulsar().getTopicPoliciesService()
.unregisterListener(TopicName.getPartitionedTopicName(topic), this);
}
}

@VisibleForTesting
public MessageDeduplication getMessageDeduplication() {
return messageDeduplication;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
Expand All @@ -111,7 +112,6 @@ protected void setup() throws Exception {
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(true);
this.conf.setDefaultNumberOfNamespaceBundles(1);
this.conf.setMaxMessageSizeCheckIntervalInSeconds(1);
super.internalSetup();

admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
Expand Down Expand Up @@ -1942,14 +1942,24 @@ public void testTopicMaxMessageSizeApi() throws Exception{
}
}

@Test(timeOut = 20000)
public void testTopicMaxMessageSize() throws Exception{
doTestTopicMaxMessageSize(true);
doTestTopicMaxMessageSize(false);
@DataProvider(name = "persistentAndPartition")
public Object[][] implementations() {
return new Object[][]{
{TopicDomain.persistent, true},
{TopicDomain.persistent, false},
{TopicDomain.non_persistent, true},
{TopicDomain.non_persistent, false},
};
}

private void doTestTopicMaxMessageSize(boolean isPartitioned) throws Exception {
final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
@Test(dataProvider = "persistentAndPartition")
public void testTopicMaxMessageSize(TopicDomain topicDomain, boolean isPartitioned) throws Exception {
final String topic = TopicName.get(
topicDomain.value(),
NamespaceName.get(myNamespace),
"test-" + UUID.randomUUID()
).toString();

if (isPartitioned) {
admin.topics().createPartitionedTopic(topic, 3);
}
Expand All @@ -1958,8 +1968,25 @@ private void doTestTopicMaxMessageSize(boolean isPartitioned) throws Exception {
assertNull(admin.topicPolicies().getMaxMessageSize(topic));
// set msg size
admin.topicPolicies().setMaxMessageSize(topic, 10);
Awaitility.await().until(()
-> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null);
Jason918 marked this conversation as resolved.
Show resolved Hide resolved
if (isPartitioned) {
for (int i = 0; i <3; i++) {
String partitionName = TopicName.get(topic).getPartition(i).toString();
Awaitility.await().untilAsserted(() -> {
AbstractTopic partition =
(AbstractTopic) pulsar.getBrokerService().getTopicIfExists(partitionName).get().get();
assertEquals(partition.getHierarchyTopicPolicies().getTopicMaxMessageSize().get(),
Integer.valueOf(10));
});
}
} else {
Awaitility.await().untilAsserted(() -> {
AbstractTopic abstractTopic =
(AbstractTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
assertEquals(abstractTopic.getHierarchyTopicPolicies().getTopicMaxMessageSize().get(),
Integer.valueOf(10));
});
}

assertEquals(admin.topicPolicies().getMaxMessageSize(topic).intValue(), 10);

try {
Expand All @@ -1983,6 +2010,23 @@ private void doTestTopicMaxMessageSize(boolean isPartitioned) throws Exception {
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 412);
}
//make sure policy value take effect.
if (isPartitioned) {
for (int i = 0; i <3; i++) {
String partitionName = TopicName.get(topic).getPartition(i).toString();
Awaitility.await().untilAsserted(() -> {
AbstractTopic partition =
(AbstractTopic) pulsar.getBrokerService().getTopicIfExists(partitionName).get().get();
assertNull(partition.getHierarchyTopicPolicies().getTopicMaxMessageSize().getTopicValue());
});
}
} else {
Awaitility.await().untilAsserted(() -> {
AbstractTopic abstractTopic =
(AbstractTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
assertNull(abstractTopic.getHierarchyTopicPolicies().getTopicMaxMessageSize().getTopicValue());
});
}

Awaitility.await().untilAsserted(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class HierarchyTopicPolicies {
final PolicyHierarchyValue<InactiveTopicPolicies> inactiveTopicPolicies;
final PolicyHierarchyValue<Integer> maxSubscriptionsPerTopic;
final Map<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>> backLogQuotaMap;
final PolicyHierarchyValue<Integer> topicMaxMessageSize;

public HierarchyTopicPolicies() {
inactiveTopicPolicies = new PolicyHierarchyValue<>();
Expand All @@ -40,5 +41,6 @@ public HierarchyTopicPolicies() {
.put(BacklogQuotaType.destination_storage, new PolicyHierarchyValue<>())
.put(BacklogQuotaType.message_age, new PolicyHierarchyValue<>())
.build();
topicMaxMessageSize = new PolicyHierarchyValue<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class PolicyHierarchyValue<T> {
@Getter
private volatile T namespaceValue;

@VisibleForTesting
Jason918 marked this conversation as resolved.
Show resolved Hide resolved
@Getter
private volatile T topicValue;

private volatile T value;
Expand Down