Skip to content

Commit ac46e2e

Browse files
authored
[fix][broker] Disable EntryFilters for system topics (#20514)
1 parent fe556ab commit ac46e2e

File tree

3 files changed

+19
-1
lines changed

3 files changed

+19
-1
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java

+4
Original file line numberDiff line numberDiff line change
@@ -1163,6 +1163,10 @@ public void updateResourceGroupLimiter(Optional<Policies> optPolicies) {
11631163
}
11641164

11651165
public void updateEntryFilters() {
1166+
if (isSystemTopic()) {
1167+
entryFilters = Pair.of(null, Collections.emptyList());
1168+
return;
1169+
}
11661170
final EntryFilters entryFiltersPolicy = getEntryFiltersPolicy();
11671171
if (entryFiltersPolicy == null || StringUtils.isBlank(entryFiltersPolicy.getEntryFilterNames())) {
11681172
entryFilters = Pair.of(null, Collections.emptyList());

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ public class EntryFilterSupport {
3535

3636
public EntryFilterSupport(Subscription subscription) {
3737
this.subscription = subscription;
38-
if (subscription != null && subscription.getTopic() != null) {
38+
if (subscription != null && subscription.getTopic() != null
39+
&& !subscription.getTopic().isSystemTopic()) {
3940
final BrokerService brokerService = subscription.getTopic().getBrokerService();
4041
final boolean allowOverrideEntryFilters = brokerService
4142
.pulsar().getConfiguration().isAllowOverrideEntryFilters();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java

+13
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@
1818
*/
1919
package org.apache.pulsar.broker.service.persistent;
2020

21+
import java.util.List;
2122
import java.util.concurrent.CompletableFuture;
2223
import org.apache.bookkeeper.mledger.ManagedLedger;
2324
import org.apache.pulsar.broker.PulsarServerException;
2425
import org.apache.pulsar.broker.namespace.NamespaceService;
2526
import org.apache.pulsar.broker.service.BrokerService;
27+
import org.apache.pulsar.broker.service.plugin.EntryFilter;
2628
import org.apache.pulsar.common.naming.SystemTopicNames;
2729
import org.apache.pulsar.common.naming.TopicName;
30+
import org.apache.pulsar.common.policies.data.EntryFilters;
2831

2932
public class SystemTopic extends PersistentTopic {
3033

@@ -82,4 +85,14 @@ public boolean isEncryptionRequired() {
8285
// System topics are only written by the broker that can't know the encryption context.
8386
return false;
8487
}
88+
89+
@Override
90+
public EntryFilters getEntryFiltersPolicy() {
91+
return null;
92+
}
93+
94+
@Override
95+
public List<EntryFilter> getEntryFilters() {
96+
return null;
97+
}
8598
}

0 commit comments

Comments
 (0)