Skip to content

Commit

Permalink
[branch-2.10][improve][broker] Do not retain the data in the system t…
Browse files Browse the repository at this point in the history
…opic (apache#22022)

### Motivation

For some use case, the users need to store all the messages even though these message are acked by all subscription.
So they set the retention policy of the namespace to infinite retention (setting both time and size limits to `-1`).  But the data in the system topic does not need for infinite retention.

### Modifications

For system topics, do not retain messages that have already been acknowledged.
  • Loading branch information
liangyepianzhou committed Feb 6, 2024
1 parent 49f6a9f commit 13ee3d8
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1646,10 +1646,18 @@ private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull T
}

if (retentionPolicies == null) {
retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
() -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
serviceConfig.getDefaultRetentionSizeInMB())
);
if (EventsTopicNames.checkTopicIsEventsNames(topicName)
|| EventsTopicNames.isTransactionInternalName(topicName)) {
if (log.isDebugEnabled()) {
log.debug("{} Disable data retention policy for system topic.", topicName);
}
retentionPolicies = new RetentionPolicies(0, 0);
} else {
retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
() -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
serviceConfig.getDefaultRetentionSizeInMB())
);
}
}

ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,21 @@
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -208,6 +213,49 @@ public void testCompactionRetentionOnTopicCreationWithTopicPolicies() throws Exc
);
}

@Test
public void testRetentionPolicesForSystemTopic() throws Exception {
String namespace = "my-tenant/my-ns";
String topicPrefix = "persistent://" + namespace + "/";
admin.namespaces().setRetention(namespace, new RetentionPolicies(-1, -1));
// Check event topics and transaction internal topics.
for (String eventTopic : EventsTopicNames.EVENTS_TOPIC_NAMES) {
checkSystemTopicRetentionPolicy(topicPrefix + eventTopic);
}
checkSystemTopicRetentionPolicy(topicPrefix + TopicName.TRANSACTION_COORDINATOR_ASSIGN);
checkSystemTopicRetentionPolicy(topicPrefix + TopicName.TRANSACTION_COORDINATOR_LOG);
checkSystemTopicRetentionPolicy(topicPrefix + TopicName.PENDING_ACK_STORE_SUFFIX);

// Check common topics.
checkCommonTopicRetentionPolicy(topicPrefix + "my-topic" + System.nanoTime());
// Specify retention policies for system topic.
pulsar.getConfiguration().setTopicLevelPoliciesEnabled(true);
pulsar.getConfiguration().setSystemTopicEnabled(true);
admin.topics().createNonPartitionedTopic(topicPrefix + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
admin.topicPolicies().setRetention(topicPrefix + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT,
new RetentionPolicies(10, 10));
Awaitility.await().untilAsserted(() -> {
checkTopicRetentionPolicy(topicPrefix + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT,
new RetentionPolicies(10, 10));
});
}

private void checkSystemTopicRetentionPolicy(String topicName) throws Exception {
checkTopicRetentionPolicy(topicName, new RetentionPolicies(0, 0));

}

private void checkCommonTopicRetentionPolicy(String topicName) throws Exception {
checkTopicRetentionPolicy(topicName, new RetentionPolicies(-1, -1));
}

private void checkTopicRetentionPolicy(String topicName, RetentionPolicies retentionPolicies) throws Exception {
ManagedLedgerConfig config = pulsar.getBrokerService()
.getManagedLedgerConfig(TopicName.get(topicName)).get();
Assert.assertEquals(config.getRetentionSizeInMB(), retentionPolicies.getRetentionSizeInMB());
Assert.assertEquals(config.getRetentionTimeMillis(),retentionPolicies.getRetentionTimeInMinutes() * 60000L);
}

private void testCompactionCursorRetention(String topic) throws Exception {
Set<String> keys = Sets.newHashSet("a", "b", "c");
Set<String> keysToExpire = Sets.newHashSet("x1", "x2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ public static boolean checkTopicIsTransactionCoordinatorAssign(TopicName topicNa
.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
}

public static boolean isTransactionInternalName(TopicName topicName) {
String topic = topicName.toString();
return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString())
|| topic.startsWith(TopicName.TRANSACTION_COORDINATOR_LOG.toString())
|| topic.endsWith(TopicName.PENDING_ACK_STORE_SUFFIX);
}

public static boolean isTopicPoliciesSystemTopic(String topic) {
if (topic == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public TopicName load(String name) throws Exception {
public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_");

public static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack";
public static TopicName get(String domain, NamespaceName namespaceName, String topic) {
String name = domain + "://" + namespaceName.toString() + '/' + topic;
return TopicName.get(name);
Expand Down

0 comments on commit 13ee3d8

Please sign in to comment.