Skip to content

Commit

Permalink
remove the improvement for the method isSystemTopic
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Jul 18, 2024
1 parent 48ccb02 commit cd2ec2e
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@
*/
package org.apache.pulsar.common.naming;

import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;

/**
* Encapsulate the parsing of the completeTopicName name.
*/
@Slf4j
public class SystemTopicNames {

/**
Expand All @@ -54,12 +51,6 @@ public class SystemTopicNames {

public static final String PENDING_ACK_STORE_CURSOR_NAME = "__pending_ack_state";

public static final String TRANSACTION_COORDINATOR_ASSIGN_LOCAL_NAME = "transaction_coordinator_assign";

public static final String TRANSACTION_COORDINATOR_LOG_LOCAL_NAME = "__transaction_log_";

public static final String RESOURCE_USAGE_TOPIC_LOCAL_NAME = "resource-usage";

/**
* The set of all local topic names declared above.
*/
Expand All @@ -69,22 +60,18 @@ public class SystemTopicNames {


public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_COORDINATOR_ASSIGN_LOCAL_NAME);
NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign");

public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_COORDINATOR_LOG_LOCAL_NAME);
NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_");

public static final TopicName RESOURCE_USAGE_TOPIC = TopicName.get(TopicDomain.non_persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, RESOURCE_USAGE_TOPIC_LOCAL_NAME);
NamespaceName.SYSTEM_NAMESPACE, "resource-usage");

public static boolean isEventSystemTopic(TopicName topicName) {
return EVENTS_TOPIC_NAMES.contains(TopicName.get(topicName.getPartitionedTopicName()).getLocalName());
}

public static boolean isEventSystemTopicLocalName(String localTopicName) {
return EVENTS_TOPIC_NAMES.contains(localTopicName);
}

public static boolean isTransactionCoordinatorAssign(TopicName topicName) {
return topicName != null && topicName.toString()
.startsWith(TRANSACTION_COORDINATOR_ASSIGN.toString());
Expand All @@ -104,23 +91,8 @@ public static boolean isTransactionInternalName(TopicName topicName) {
|| topic.endsWith(PENDING_ACK_STORE_SUFFIX);
}

public static boolean isTransactionInternalLocalName(String localTopicName) {
return TRANSACTION_COORDINATOR_ASSIGN_LOCAL_NAME.equals(localTopicName)
|| TRANSACTION_COORDINATOR_LOG_LOCAL_NAME.equals(localTopicName)
|| PENDING_ACK_STORE_SUFFIX.equals(localTopicName);
}

public static boolean isSystemTopic(TopicName topicName) {
int partition = topicName.getPartitionIndex();
String localTopicName = topicName.getLocalName();
if (partition >= 0) {
int suffixLen = PARTITIONED_TOPIC_SUFFIX.length() + String.valueOf(partition).length();
if (localTopicName.length() <= suffixLen) {
log.error("Found an error topic name: {}", topicName);
throw new IllegalArgumentException("Found an error topic name: " + topicName.toString());
}
localTopicName = localTopicName.substring(0, localTopicName.length() - suffixLen);
}
return isEventSystemTopicLocalName(localTopicName) || isTransactionInternalLocalName(localTopicName);
TopicName nonPartitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
return isEventSystemTopic(nonPartitionedTopicName) || isTransactionInternalName(nonPartitionedTopicName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class TopicName implements ServiceUnitId {
private final int partitionIndex;

private static final LoadingCache<String, TopicName> cache = CacheBuilder.newBuilder().maximumSize(100000)
.expireAfterWrite(30, TimeUnit.MINUTES).build(new CacheLoader<String, TopicName>() {
.expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<String, TopicName>() {
@Override
public TopicName load(String name) throws Exception {
return new TopicName(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.pulsar.common.naming;

import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand All @@ -46,28 +44,4 @@ public void testIsTopicPoliciesSystemTopic(String topicName, boolean expectedRes
assertEquals(expectedResult, SystemTopicNames.isSystemTopic(TopicName.get(topicName)));
assertEquals(expectedResult, SystemTopicNames.isEventSystemTopic(TopicName.get(topicName)));
}

@Test
public void testIsSystemTopic() {
String tp1 = "public/default/tp1";
assertFalse(SystemTopicNames.isSystemTopic(TopicName.get(tp1)));
String tp2 = "public/default/tp1-partition-0";
assertFalse(SystemTopicNames.isSystemTopic(TopicName.get(tp2)));
String tp3 = "public/default/__change_events";
assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp3)));
String tp4 = "public/default/__change_events-partition-0";
assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp4)));
String tp5 = "public/default/transaction_coordinator_assign";
assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp5)));
String tp6 = "public/default/transaction_coordinator_assign-partition-0";
assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp6)));
String tp7 = "persistent://public/default/__change_events";
assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp7)));
String tp8 = "persistent://public/default/__change_events-partition-0";
assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp8)));
String tp9 = "persistent://public/default/transaction_coordinator_assign";
assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp9)));
String tp10 = "persistent://public/default/transaction_coordinator_assign-partition-0";
assertTrue(SystemTopicNames.isSystemTopic(TopicName.get(tp10)));
}
}

0 comments on commit cd2ec2e

Please sign in to comment.