diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java index 3e9c5d9d559a5c..716d9bc31facb3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java @@ -18,16 +18,13 @@ */ package org.apache.pulsar.common.naming; -import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX; import com.google.common.collect.Sets; import java.util.Collections; import java.util.Set; -import lombok.extern.slf4j.Slf4j; /** * Encapsulate the parsing of the completeTopicName name. */ -@Slf4j public class SystemTopicNames { /** @@ -54,12 +51,6 @@ public class SystemTopicNames { public static final String PENDING_ACK_STORE_CURSOR_NAME = "__pending_ack_state"; - public static final String TRANSACTION_COORDINATOR_ASSIGN_LOCAL_NAME = "transaction_coordinator_assign"; - - public static final String TRANSACTION_COORDINATOR_LOG_LOCAL_NAME = "__transaction_log_"; - - public static final String RESOURCE_USAGE_TOPIC_LOCAL_NAME = "resource-usage"; - /** * The set of all local topic names declared above. */ @@ -69,22 +60,18 @@ public class SystemTopicNames { public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(), - NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_COORDINATOR_ASSIGN_LOCAL_NAME); + NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign"); public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(), - NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_COORDINATOR_LOG_LOCAL_NAME); + NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_"); public static final TopicName RESOURCE_USAGE_TOPIC = TopicName.get(TopicDomain.non_persistent.value(), - NamespaceName.SYSTEM_NAMESPACE, RESOURCE_USAGE_TOPIC_LOCAL_NAME); + NamespaceName.SYSTEM_NAMESPACE, "resource-usage"); public static boolean isEventSystemTopic(TopicName topicName) { return EVENTS_TOPIC_NAMES.contains(TopicName.get(topicName.getPartitionedTopicName()).getLocalName()); } - public static boolean isEventSystemTopicLocalName(String localTopicName) { - return EVENTS_TOPIC_NAMES.contains(localTopicName); - } - public static boolean isTransactionCoordinatorAssign(TopicName topicName) { return topicName != null && topicName.toString() .startsWith(TRANSACTION_COORDINATOR_ASSIGN.toString()); @@ -104,23 +91,8 @@ public static boolean isTransactionInternalName(TopicName topicName) { || topic.endsWith(PENDING_ACK_STORE_SUFFIX); } - public static boolean isTransactionInternalLocalName(String localTopicName) { - return TRANSACTION_COORDINATOR_ASSIGN_LOCAL_NAME.equals(localTopicName) - || TRANSACTION_COORDINATOR_LOG_LOCAL_NAME.equals(localTopicName) - || PENDING_ACK_STORE_SUFFIX.equals(localTopicName); - } - public static boolean isSystemTopic(TopicName topicName) { - int partition = topicName.getPartitionIndex(); - String localTopicName = topicName.getLocalName(); - if (partition >= 0) { - int suffixLen = PARTITIONED_TOPIC_SUFFIX.length() + String.valueOf(partition).length(); - if (localTopicName.length() <= suffixLen) { - log.error("Found an error topic name: {}", topicName); - throw new IllegalArgumentException("Found an error topic name: " + topicName.toString()); - } - localTopicName = localTopicName.substring(0, localTopicName.length() - suffixLen); - } - return isEventSystemTopicLocalName(localTopicName) || isTransactionInternalLocalName(localTopicName); + TopicName nonPartitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); + return isEventSystemTopic(nonPartitionedTopicName) || isTransactionInternalName(nonPartitionedTopicName); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 419a8bece4d9e7..d264eab9574ef9 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -55,7 +55,7 @@ public class TopicName implements ServiceUnitId { private final int partitionIndex; private static final LoadingCache cache = CacheBuilder.newBuilder().maximumSize(100000) - .expireAfterWrite(30, TimeUnit.MINUTES).build(new CacheLoader() { + .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader() { @Override public TopicName load(String name) throws Exception { return new TopicName(name); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java index 93e6041bf55d33..92d93021973b1b 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java @@ -19,8 +19,6 @@ package org.apache.pulsar.common.naming; import static org.testng.AssertJUnit.assertEquals; -import static org.testng.AssertJUnit.assertFalse; -import static org.testng.AssertJUnit.assertTrue; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -46,28 +44,4 @@ public void testIsTopicPoliciesSystemTopic(String topicName, boolean expectedRes assertEquals(expectedResult, SystemTopicNames.isSystemTopic(TopicName.get(topicName))); assertEquals(expectedResult, SystemTopicNames.isEventSystemTopic(TopicName.get(topicName))); } - - @Test - public void testIsSystemTopic() { - String tp1 = "public/default/tp1"; - assertFalse(SystemTopicNames.isSystemTopic(TopicName.get(tp1))); - String tp2 = "public/default/tp1-partition-0"; - assertFalse(SystemTopicNames.isSystemTopic(TopicName.get(tp2))); - String tp3 = "public/default/__change_events"; - assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp3))); - String tp4 = "public/default/__change_events-partition-0"; - assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp4))); - String tp5 = "public/default/transaction_coordinator_assign"; - assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp5))); - String tp6 = "public/default/transaction_coordinator_assign-partition-0"; - assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp6))); - String tp7 = "persistent://public/default/__change_events"; - assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp7))); - String tp8 = "persistent://public/default/__change_events-partition-0"; - assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp8))); - String tp9 = "persistent://public/default/transaction_coordinator_assign"; - assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp9))); - String tp10 = "persistent://public/default/transaction_coordinator_assign-partition-0"; - assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp10))); - } }