diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java index 5146b513fb578..1dbb480f29d66 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java @@ -37,6 +37,7 @@ import org.apache.pulsar.broker.resources.TenantResources; import org.apache.pulsar.common.conf.InternalConfigurationData; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; @@ -311,7 +312,7 @@ public static void main(String[] args) throws Exception { createNamespaceIfAbsent(resources, NamespaceName.SYSTEM_NAMESPACE, arguments.cluster); // Create transaction coordinator assign partitioned topic - createPartitionedTopic(configStore, TopicName.TRANSACTION_COORDINATOR_ASSIGN, + createPartitionedTopic(configStore, SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, arguments.numTransactionCoordinators); localStore.close(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java index f94491235e6f7..f8d1548c842fa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java @@ -19,7 +19,7 @@ package org.apache.pulsar; import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE; -import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_ASSIGN; +import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN; import com.beust.jcommander.Parameter; import com.google.common.collect.Sets; import java.io.File; @@ -30,9 +30,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; @@ -311,10 +313,15 @@ public void start() throws Exception { createNameSpace(cluster, TopicName.PUBLIC_TENANT, TopicName.PUBLIC_TENANT + "/" + TopicName.DEFAULT_NAMESPACE); //create pulsar system namespace createNameSpace(cluster, SYSTEM_NAMESPACE.getTenant(), SYSTEM_NAMESPACE.toString()); - if (config.isTransactionCoordinatorEnabled() && !admin.namespaces() - .getTopics(SYSTEM_NAMESPACE.toString()) - .contains(TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString())) { - admin.topics().createPartitionedTopic(TRANSACTION_COORDINATOR_ASSIGN.toString(), 1); + if (config.isTransactionCoordinatorEnabled()) { + NamespaceResources.PartitionedTopicResources partitionedTopicResources = + broker.getPulsarResources().getNamespaceResources().getPartitionedTopicResources(); + Optional getResult = + partitionedTopicResources.getPartitionedTopicMetadataAsync(TRANSACTION_COORDINATOR_ASSIGN).get(); + if (!getResult.isPresent()) { + partitionedTopicResources.createPartitionedTopic(TRANSACTION_COORDINATOR_ASSIGN, + new PartitionedTopicMetadata(1)); + } } log.debug("--- setup completed ---"); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java index 0f66bd01ee0a3..66607dd4c0a41 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java @@ -22,7 +22,7 @@ import com.beust.jcommander.Parameter; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.util.CmdGenerateDocs; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -103,7 +103,8 @@ public static void main(String[] args) throws Exception { arguments.cluster); // Create transaction coordinator assign partitioned topic - PulsarClusterMetadataSetup.createPartitionedTopic(configStore, TopicName.TRANSACTION_COORDINATOR_ASSIGN, + PulsarClusterMetadataSetup.createPartitionedTopic(configStore, + SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, arguments.numTransactionCoordinators); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 35cd22d4bbe3f..fc5feedc5e1ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -22,7 +22,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER; -import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_LOG; +import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -113,7 +113,6 @@ import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl; import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; -import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.broker.validator.MultipleListenerValidator; import org.apache.pulsar.broker.web.WebService; import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlet; @@ -273,8 +272,6 @@ public enum State { private volatile CompletableFuture closeFuture; // key is listener name, value is pulsar address and pulsar ssl address private Map advertisedListeners; - private NamespaceName heartbeatNamespaceV1; - private NamespaceName heartbeatNamespaceV2; public PulsarService(ServiceConfiguration config) { this(config, Optional.empty(), (exitCode) -> { @@ -723,8 +720,6 @@ public void start() throws PulsarServerException { createMetricsServlet(); this.addWebServerHandlers(webService, metricsServlet, this.config); this.webService.start(); - heartbeatNamespaceV1 = NamespaceService.getHeartbeatNamespace(this.advertisedAddress, this.config); - heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(this.advertisedAddress, this.config); // Refresh addresses and update configuration, since the port might have been dynamically assigned if (config.getBrokerServicePort().equals(Optional.of(0))) { @@ -1129,7 +1124,7 @@ public void loadNamespaceTopics(NamespaceBundle bundle) { .get(config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS)) { try { TopicName topicName = TopicName.get(topic); - if (bundle.includes(topicName) && !isTransactionSystemTopic(topicName)) { + if (bundle.includes(topicName) && !isTransactionInternalName(topicName)) { CompletableFuture> future = brokerService.getTopicIfExists(topic); if (future != null) { persistentTopics.add(future); @@ -1709,20 +1704,6 @@ public void shutdownNow() { processTerminator.accept(-1); } - - public static boolean isTransactionSystemTopic(TopicName topicName) { - String topic = topicName.toString(); - return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()) - || topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString()) - || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX); - } - - public static boolean isTransactionInternalName(TopicName topicName) { - String topic = topicName.toString(); - return topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString()) - || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX); - } - @VisibleForTesting protected BrokerService newBrokerService(PulsarService pulsar) throws Exception { return new BrokerService(pulsar, ioEventLoopGroup); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index 902546958c54e..569f590034268 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -56,6 +56,7 @@ import org.apache.pulsar.common.api.proto.TxnAction; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; @@ -124,7 +125,7 @@ public void onLoad(NamespaceBundle bundle) { if (ex == null) { for (String topic : topics) { TopicName name = TopicName.get(topic); - if (TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName() + if (SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getLocalName() .equals(TopicName.get(name.getPartitionedTopicName()).getLocalName()) && name.isPartitioned()) { handleTcClientConnect(TransactionCoordinatorID.get(name.getPartitionIndex())); @@ -144,7 +145,7 @@ public void unLoad(NamespaceBundle bundle) { if (ex == null) { for (String topic : topics) { TopicName name = TopicName.get(topic); - if (TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName() + if (SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getLocalName() .equals(TopicName.get(name.getPartitionedTopicName()).getLocalName()) && name.isPartitioned()) { removeTransactionMetadataStore( @@ -170,7 +171,7 @@ public CompletableFuture handleTcClientConnect(TransactionCoordinatorID tc if (stores.get(tcId) != null) { completableFuture.complete(null); } else { - pulsarService.getBrokerService().checkTopicNsOwnership(TopicName + pulsarService.getBrokerService().checkTopicNsOwnership(SystemTopicNames .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString()) .thenRun(() -> internalPinnedExecutor.execute(() -> { final Semaphore tcLoadSemaphore = this.tcLoadSemaphores diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 909c7de8d746c..5ed19f5ed96d3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -19,9 +19,9 @@ package org.apache.pulsar.broker.admin.impl; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName; import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC; -import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign; +import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionCoordinatorAssign; +import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName; import com.fasterxml.jackson.core.JsonProcessingException; import com.github.zafarkhaja.semver.Version; import com.google.common.collect.Lists; @@ -723,7 +723,7 @@ protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authorit future.thenAccept(__ -> { // If the topic name is a partition name, no need to get partition topic metadata again if (topicName.isPartitioned()) { - if (checkTopicIsTransactionCoordinatorAssign(topicName)) { + if (isTransactionCoordinatorAssign(topicName)) { internalUnloadTransactionCoordinatorAsync(asyncResponse, authoritative); } else { internalUnloadNonPartitionedTopicAsync(asyncResponse, authoritative); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java index 8eff6815404cd..bbd2e0be9c3e1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java @@ -42,6 +42,7 @@ import org.apache.pulsar.client.admin.Transactions; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TransactionBufferStats; @@ -68,7 +69,7 @@ public abstract class TransactionsBase extends AdminResource { protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative, Integer coordinatorId) { if (coordinatorId != null) { - validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId), + validateTopicOwnership(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId), authoritative); TransactionMetadataStore transactionMetadataStore = pulsar().getTransactionMetadataStoreService().getStores() @@ -80,7 +81,7 @@ protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean } asyncResponse.resume(transactionMetadataStore.getCoordinatorStats()); } else { - getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN, + getPartitionedTopicMetadataAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false, false).thenAccept(partitionMetadata -> { if (partitionMetadata.partitions == 0) { asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, @@ -151,7 +152,7 @@ protected CompletableFuture internalGetPendingAckSta protected void internalGetTransactionMetadata(AsyncResponse asyncResponse, boolean authoritative, int mostSigBits, long leastSigBits) { try { - validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits), + validateTopicOwnership(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits), authoritative); CompletableFuture transactionMetadataFuture = new CompletableFuture<>(); TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService() @@ -260,7 +261,7 @@ protected void internalGetSlowTransactions(AsyncResponse asyncResponse, boolean authoritative, long timeout, Integer coordinatorId) { try { if (coordinatorId != null) { - validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId), + validateTopicOwnership(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId), authoritative); TransactionMetadataStore transactionMetadataStore = pulsar().getTransactionMetadataStoreService().getStores() @@ -296,7 +297,7 @@ protected void internalGetSlowTransactions(AsyncResponse asyncResponse, asyncResponse.resume(transactionMetadata); }); } else { - getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN, + getPartitionedTopicMetadataAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false, false).thenAccept(partitionMetadata -> { if (partitionMetadata.partitions == 0) { asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, @@ -349,7 +350,7 @@ protected void internalGetSlowTransactions(AsyncResponse asyncResponse, protected void internalGetCoordinatorInternalStats(AsyncResponse asyncResponse, boolean authoritative, boolean metadata, int coordinatorId) { try { - TopicName topicName = TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId); + TopicName topicName = SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId); validateTopicOwnership(topicName, authoritative); TransactionMetadataStore metadataStore = pulsar().getTransactionMetadataStoreService() .getStores().get(TransactionCoordinatorID.get(coordinatorId)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index a321c035c78f9..e1e1dc106762e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -23,6 +23,7 @@ import static java.lang.String.format; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE; import com.google.common.collect.Lists; import com.google.common.hash.Hashing; import io.prometheus.client.Counter; @@ -1372,9 +1373,16 @@ public static String getSLAMonitorBrokerName(ServiceUnitId ns) { } public static boolean isSystemServiceNamespace(String namespace) { + return SYSTEM_NAMESPACE.toString().equals(namespace) + || SLA_NAMESPACE_PATTERN.matcher(namespace).matches() + || HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches() + || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches(); + } + + public static boolean isHeartbeatNamespace(ServiceUnitId ns) { + String namespace = ns.getNamespaceObject().toString(); return HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches() - || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches() - || SLA_NAMESPACE_PATTERN.matcher(namespace).matches(); + || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches(); } public boolean registerSLANamespace() throws PulsarServerException { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTopicTransportManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTopicTransportManager.java index 38d1bb3c60223..6ce0781781a54 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTopicTransportManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTopicTransportManager.java @@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.ReaderListener; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.slf4j.Logger; @@ -69,7 +70,7 @@ private Producer createProducer() throws PulsarClientException { final int sendTimeoutSecs = 10; return pulsarClient.newProducer(Schema.BYTEBUFFER) - .topic(RESOURCE_USAGE_TOPIC_NAME) + .topic(SystemTopicNames.RESOURCE_USAGE_TOPIC.toString()) .batchingMaxPublishDelay(publishDelayMilliSecs, TimeUnit.MILLISECONDS) .sendTimeout(sendTimeoutSecs, TimeUnit.SECONDS) .blockIfQueueFull(false) @@ -123,7 +124,7 @@ private class ResourceUsageReader implements ReaderListener, AutoCloseab public ResourceUsageReader() throws PulsarClientException { consumer = pulsarClient.newReader() - .topic(RESOURCE_USAGE_TOPIC_NAME) + .topic(SystemTopicNames.RESOURCE_USAGE_TOPIC.toString()) .startMessageId(MessageId.latest) .readerListener(this) .create(); @@ -165,7 +166,6 @@ public void received(Reader reader, Message msg) { } private static final Logger LOG = LoggerFactory.getLogger(ResourceUsageTopicTransportManager.class); - public static final String RESOURCE_USAGE_TOPIC_NAME = "non-persistent://pulsar/system/resource-usage"; private final PulsarService pulsarService; private final PulsarClient pulsarClient; private final ResourceUsageWriterTask pTask; @@ -177,7 +177,7 @@ public void received(Reader reader, Message msg) { private void createTenantAndNamespace() throws PulsarServerException, PulsarAdminException { // Create a public tenant and default namespace - TopicName topicName = TopicName.get(RESOURCE_USAGE_TOPIC_NAME); + TopicName topicName = SystemTopicNames.RESOURCE_USAGE_TOPIC; PulsarAdmin admin = pulsarService.getAdminClient(); ServiceConfiguration config = pulsarService.getConfig(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 27784332fdc3a..c3f3dc64e5a89 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -23,7 +23,7 @@ import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import static org.apache.commons.collections.CollectionUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; -import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic; +import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -121,7 +121,6 @@ import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; -import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.broker.validator.BindAddressValidator; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; @@ -132,13 +131,13 @@ import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.configuration.BindAddress; import org.apache.pulsar.common.configuration.FieldContext; -import org.apache.pulsar.common.events.EventsTopicNames; import org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor; import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor; import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils; import org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; @@ -1376,7 +1375,7 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, return; } - if (isTransactionSystemTopic(topicName)) { + if (isTransactionInternalName(topicName)) { String msg = String.format("Can not create transaction system topic %s", topic); log.warn(msg); pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); @@ -1904,7 +1903,7 @@ private CompletableFuture unloadServiceUnit(NamespaceBundle serviceUnit this.getPulsar().getTransactionMetadataStoreService(); // if the store belongs to this bundle, remove and close the store this.getPulsar().getTransactionMetadataStoreService().getStores().values().stream().filter(store -> - serviceUnit.includes(TopicName.TRANSACTION_COORDINATOR_ASSIGN + serviceUnit.includes(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN .getPartition((int) (store.getTransactionCoordinatorID().getId())))) .map(TransactionMetadataStore::getTransactionCoordinatorID) .forEach(tcId -> closeFutures.add(metadataStoreService.removeTransactionMetadataStore(tcId))); @@ -2936,25 +2935,8 @@ public boolean isSystemTopic(String topic) { } public boolean isSystemTopic(TopicName topicName) { - if (topicName.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE) - || topicName.getNamespaceObject().equals(pulsar.getHeartbeatNamespaceV1()) - || topicName.getNamespaceObject().equals(pulsar.getHeartbeatNamespaceV2())) { - return true; - } - - TopicName nonePartitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); - - // event topic - if (EventsTopicNames.checkTopicIsEventsNames(nonePartitionedTopicName)) { - return true; - } - - String localName = nonePartitionedTopicName.getLocalName(); - // transaction pending ack topic - if (StringUtils.endsWith(localName, MLPendingAckStore.PENDING_ACK_STORE_SUFFIX)) { - return true; - } - return false; + return NamespaceService.isSystemServiceNamespace(topicName.getNamespace()) + || SystemTopicNames.isSystemTopic(topicName); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index fc0700eaf7771..f2db5b6be5193 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -236,8 +236,7 @@ public CompletableFuture getTopicPoliciesBypassCacheAsync(TopicNa public CompletableFuture addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { CompletableFuture result = new CompletableFuture<>(); NamespaceName namespace = namespaceBundle.getNamespaceObject(); - if (NamespaceService.checkHeartbeatNamespace(namespace) != null - || NamespaceService.checkHeartbeatNamespaceV2(namespace) != null) { + if (NamespaceService.isHeartbeatNamespace(namespace)) { result.complete(null); return result; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 337b4c7fc1b24..1edc7ae12e8f0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -42,6 +42,7 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.AbstractTopic; @@ -71,7 +72,6 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeySharedMeta; -import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats; @@ -536,11 +536,7 @@ public CompletableFuture stopReplProducers() { @Override public CompletableFuture checkReplication() { TopicName name = TopicName.get(topic); - if (!name.isGlobal()) { - return CompletableFuture.completedFuture(null); - } - NamespaceName heartbeatNamespace = brokerService.pulsar().getHeartbeatNamespaceV2(); - if (name.getNamespaceObject().equals(heartbeatNamespace)) { + if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index ec77fe69eed7d..fd0922f8f7198 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.broker.service.persistent; -import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames; +import static org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import java.util.Collections; @@ -148,7 +148,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties) ? new HashMap<>() : Collections.unmodifiableMap(subscriptionProperties); if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled() - && !checkTopicIsEventsNames(TopicName.get(topicName))) { + && !isEventSystemTopic(TopicName.get(topicName))) { this.pendingAckHandle = new PendingAckHandleImpl(this); } else { this.pendingAckHandle = new PendingAckHandleDisabled(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 85ee429c026b0..eed8ba0d3b883 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -21,7 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter.isSubscribeRateEnabled; -import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames; +import static org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; import com.carrotsearch.hppc.ObjectObjectHashMap; @@ -78,6 +78,7 @@ import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources; import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.AbstractTopic; @@ -127,8 +128,7 @@ import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.TxnAction; -import org.apache.pulsar.common.events.EventsTopicNames; -import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; @@ -292,7 +292,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS checkReplicatedSubscriptionControllerState(); TopicName topicName = TopicName.get(topic); if (brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled() - && !checkTopicIsEventsNames(topicName)) { + && !isEventSystemTopic(topicName)) { this.transactionBuffer = brokerService.getPulsar() .getTransactionBufferProvider().newTransactionBuffer(this); } else { @@ -719,7 +719,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St } try { - if (!topic.endsWith(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME) + if (!topic.endsWith(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME) && !checkSubscriptionTypesEnable(subType)) { return FutureUtil.failedFuture( new NotAllowedException("Topic[{" + topic + "}] doesn't support " @@ -1377,11 +1377,7 @@ private CompletableFuture checkPersistencePolicies() { @Override public CompletableFuture checkReplication() { TopicName name = TopicName.get(topic); - if (!name.isGlobal()) { - return CompletableFuture.completedFuture(null); - } - NamespaceName heartbeatNamespace = brokerService.pulsar().getHeartbeatNamespaceV2(); - if (name.getNamespaceObject().equals(heartbeatNamespace)) { + if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java index 988310858cda1..715a684902f1c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java @@ -22,9 +22,9 @@ import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; -import org.apache.pulsar.common.events.EventsTopicNames; -import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; public class SystemTopic extends PersistentTopic { @@ -65,7 +65,7 @@ public void checkGC() { @Override public CompletableFuture checkReplication() { - if (EventsTopicNames.isTopicPoliciesSystemTopic(topic)) { + if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) { return super.checkReplication(); } return CompletableFuture.completedFuture(null); @@ -75,10 +75,6 @@ public CompletableFuture checkReplication() { public boolean isCompactionEnabled() { // All system topics are using compaction except `HealthCheck`, // even though is not explicitly set in the policies. - TopicName name = TopicName.get(topic); - NamespaceName heartbeatNamespaceV1 = brokerService.pulsar().getHeartbeatNamespaceV1(); - NamespaceName heartbeatNamespaceV2 = brokerService.pulsar().getHeartbeatNamespaceV2(); - return !name.getNamespaceObject().equals(heartbeatNamespaceV1) - && !name.getNamespaceObject().equals(heartbeatNamespaceV2); + return !NamespaceService.isHeartbeatNamespace(TopicName.get(topic)); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java index e6ac1535f43c7..3dbda26b0a8ce 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.broker.stats.prometheus; -import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames; +import static org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic; import io.netty.util.concurrent.FastThreadLocal; import java.util.HashMap; import java.util.Map; @@ -79,7 +79,7 @@ public static void generate(PulsarService pulsar, SimpleTextOutputStream stream, topic.getSubscriptions().values().forEach(subscription -> { try { localManageLedgerStats.get().reset(); - if (!checkTopicIsEventsNames(TopicName.get(subscription.getTopic().getName())) + if (!isEventSystemTopic(TopicName.get(subscription.getTopic().getName())) && subscription instanceof PersistentSubscription && ((PersistentSubscription) subscription).checkIfPendingAckStoreInit()) { ManagedLedger managedLedger = diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java index 59fb8b032e94f..9e162f741b2c3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java @@ -21,8 +21,8 @@ import org.apache.pulsar.broker.service.TransactionBufferSnapshotService; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.events.EventType; -import org.apache.pulsar.common.events.EventsTopicNames; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.slf4j.Logger; @@ -38,7 +38,7 @@ public NamespaceEventsSystemTopicFactory(PulsarClient client) { public TopicPoliciesSystemTopicClient createTopicPoliciesSystemTopicClient(NamespaceName namespaceName) { TopicName topicName = TopicName.get(TopicDomain.persistent.value(), namespaceName, - EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); log.info("Create topic policies system topic client {}", topicName.toString()); return new TopicPoliciesSystemTopicClient(client, topicName); } @@ -46,7 +46,7 @@ public TopicPoliciesSystemTopicClient createTopicPoliciesSystemTopicClient(Names public TransactionBufferSystemTopicClient createTransactionBufferSystemTopicClient(NamespaceName namespaceName, TransactionBufferSnapshotService transactionBufferSnapshotService) { TopicName topicName = TopicName.get(TopicDomain.persistent.value(), namespaceName, - EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT); + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT); log.info("Create transaction buffer snapshot client, topicName : {}", topicName.toString()); return new TransactionBufferSystemTopicClient(client, topicName, transactionBufferSnapshotService); } @@ -55,10 +55,10 @@ public static TopicName getSystemTopicName(NamespaceName namespaceName, EventTyp switch (eventType) { case TOPIC_POLICY: return TopicName.get(TopicDomain.persistent.value(), namespaceName, - EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); case TRANSACTION_BUFFER_SNAPSHOT: return TopicName.get(TopicDomain.persistent.value(), namespaceName, - EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT); + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT); default: return null; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java index ccc1628154c9f..289cd14a7162c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java @@ -45,6 +45,7 @@ import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.CommandAck.AckType; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.SpscArrayQueue; @@ -61,10 +62,6 @@ public class MLPendingAckStore implements PendingAckStore { private final ManagedCursor cursor; - public static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack"; - - public static final String PENDING_ACK_STORE_CURSOR_NAME = "__pending_ack_state"; - private final SpscArrayQueue entryQueue; //this is for replay @@ -412,11 +409,11 @@ public CompletableFuture getManagedLedger() { } public static String getTransactionPendingAckStoreSuffix(String originTopicName, String subName) { - return TopicName.get(originTopicName) + "-" + subName + PENDING_ACK_STORE_SUFFIX; + return TopicName.get(originTopicName) + "-" + subName + SystemTopicNames.PENDING_ACK_STORE_SUFFIX; } public static String getTransactionPendingAckStoreCursorName() { - return PENDING_ACK_STORE_CURSOR_NAME; + return SystemTopicNames.PENDING_ACK_STORE_CURSOR_NAME; } private static final Logger log = LoggerFactory.getLogger(MLPendingAckStore.class); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index baa5d3fe332de..1690210a80a45 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -775,11 +775,7 @@ protected CompletableFuture validateGlobalNamespaceOwnershipAsync(Namespac public static CompletableFuture checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService, NamespaceName namespace) { - if (!namespace.isGlobal()) { - return CompletableFuture.completedFuture(null); - } - NamespaceName heartbeatNamespace = pulsarService.getHeartbeatNamespaceV2(); - if (namespace.equals(heartbeatNamespace)) { + if (!namespace.isGlobal() || NamespaceService.isHeartbeatNamespace(namespace)) { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 60e591446d185..320a375145ad5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -65,8 +65,8 @@ import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.api.proto.CommandSubscribe; -import org.apache.pulsar.common.events.EventsTopicNames; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; @@ -2794,7 +2794,7 @@ public void testPoliciesCanBeDeletedWithTopic() throws Exception { Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull(); Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic2))).isNotNull(); }); - String topicPoliciesTopic = "persistent://" + myNamespace + "/" + EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME; + String topicPoliciesTopic = "persistent://" + myNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME; PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicPoliciesTopic).get().get(); // Trigger compaction and make sure it is finished. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java index add277fc524d7..c9d554a2ca8d5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java @@ -23,7 +23,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.transaction.TransactionTestBase; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -48,11 +49,18 @@ protected void cleanup() throws Exception { @Test public void testRedirectOfGetCoordinatorInternalStats() throws Exception { Map map = admin.lookups() - .lookupPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); + .lookupPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString()); while (map.containsValue(getPulsarServiceList().get(0).getBrokerServiceUrl())) { - admin.topics().deletePartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS); - map = admin.lookups().lookupPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); + pulsarServiceList.get(0).getPulsarResources() + .getNamespaceResources() + .getPartitionedTopicResources() + .deletePartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN); + pulsarServiceList.get(0).getPulsarResources() + .getNamespaceResources() + .getPartitionedTopicResources() + .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, + new PartitionedTopicMetadata(NUM_PARTITIONS)); + map = admin.lookups().lookupPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString()); } //init tc stores pulsarClient = PulsarClient.builder() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index aff09f2d7f1d8..d46b640aac326 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -22,7 +22,6 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.http.HttpStatus; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -36,8 +35,10 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; import org.apache.pulsar.common.policies.data.TenantInfoImpl; @@ -486,14 +487,14 @@ public void testGetPendingAckInternalStats() throws Exception { ManagedLedgerInternalStats managedLedgerInternalStats = stats.pendingAckLogStats.managedLedgerInternalStats; assertEquals(TopicName.get(TopicDomain.persistent.toString(), "public", "default", "testGetPendingAckInternalStats" + "-" - + subName + MLPendingAckStore.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(), + + subName + SystemTopicNames.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(), stats.pendingAckLogStats.managedLedgerName); verifyManagedLegerInternalStats(managedLedgerInternalStats, 16); ManagedLedgerInternalStats finalManagedLedgerInternalStats = managedLedgerInternalStats; managedLedgerInternalStats.cursors.forEach((s, cursorStats) -> { - assertEquals(s, MLPendingAckStore.PENDING_ACK_STORE_CURSOR_NAME); + assertEquals(s, SystemTopicNames.PENDING_ACK_STORE_CURSOR_NAME); assertEquals(cursorStats.readPosition, finalManagedLedgerInternalStats.lastConfirmedEntry); }); @@ -503,7 +504,7 @@ public void testGetPendingAckInternalStats() throws Exception { assertEquals(TopicName.get(TopicDomain.persistent.toString(), "public", "default", "testGetPendingAckInternalStats" + "-" - + subName + MLPendingAckStore.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(), + + subName + SystemTopicNames.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(), stats.pendingAckLogStats.managedLedgerName); assertNull(managedLedgerInternalStats.ledgers.get(0).metadata); } @@ -528,8 +529,12 @@ private static void verifyCoordinatorStats(String state, } private void initTransaction(int coordinatorSize) throws Exception { - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), coordinatorSize); - admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); + pulsar.getPulsarResources() + .getNamespaceResources() + .getPartitionedTopicResources() + .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, + new PartitionedTopicMetadata(coordinatorSize)); + admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString()); pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build(); pulsarClient.close(); Awaitility.await().until(() -> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java index 4e54d1b3326ca..7b96aed47a1ac 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceUsageTransportManagerTest.java @@ -24,6 +24,7 @@ import org.apache.pulsar.broker.service.resource.usage.ResourceUsage; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.testng.annotations.AfterClass; @@ -55,7 +56,7 @@ protected void cleanup() throws Exception { @Test public void testNamespaceCreation() throws Exception { - TopicName topicName = TopicName.get(ResourceUsageTopicTransportManager.RESOURCE_USAGE_TOPIC_NAME); + TopicName topicName = SystemTopicNames.RESOURCE_USAGE_TOPIC; assertTrue(admin.tenants().getTenants().contains(topicName.getTenant())); assertTrue(admin.namespaces().getNamespaces(topicName.getTenant()).contains(topicName.getNamespace())); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java index 1b3ca1616fa61..936e3e0d26342 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java @@ -25,7 +25,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.common.events.EventsTopicNames; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.TopicType; @@ -395,7 +395,7 @@ public void testAutoCreationOfSystemTopicTransactionBufferSnapshot() throws Exce pulsar.getConfiguration().setAllowAutoTopicCreation(false); pulsar.getConfiguration().setSystemTopicEnabled(true); - final String topicString = "persistent://prop/ns-abc/" + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT; + final String topicString = "persistent://prop/ns-abc/" + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT; pulsarClient.newProducer().topic(topicString).create(); @@ -408,7 +408,7 @@ public void testAutoCreationOfSystemTopicNamespaceEvents() throws Exception { pulsar.getConfiguration().setAllowAutoTopicCreation(false); pulsar.getConfiguration().setSystemTopicEnabled(true); - final String topicString = "persistent://prop/ns-abc/" + EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME; + final String topicString = "persistent://prop/ns-abc/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME; pulsarClient.newProducer().topic(topicString).create(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index b4702fe01008e..8731e78234882 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -18,8 +18,8 @@ */ package org.apache.pulsar.broker.service; -import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_ASSIGN; -import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_LOG; +import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN; +import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_LOG; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java index 69db17f46932d..6412d7f32a90d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java @@ -20,8 +20,11 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +53,18 @@ private void baseSetupCommon() throws Exception { admin.namespaces().setNamespaceReplicationClusters("prop/ns-abc", Sets.newHashSet("test")); } + protected void createTransactionCoordinatorAssign() throws MetadataStoreException { + createTransactionCoordinatorAssign(1); + } + + protected void createTransactionCoordinatorAssign(int partitions) throws MetadataStoreException { + pulsar.getPulsarResources() + .getNamespaceResources() + .getPartitionedTopicResources() + .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, + new PartitionedTopicMetadata(partitions)); + } + void rolloverPerIntervalStats() { try { pulsar.getExecutor().submit(() -> pulsar.getBrokerService().updateRates()).get(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index c31373c05fcd6..c8c257992cf02 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -82,9 +82,10 @@ import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.client.impl.transaction.TransactionImpl; -import org.apache.pulsar.common.events.EventsTopicNames; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy; import org.apache.pulsar.common.policies.data.ClusterData; @@ -948,23 +949,23 @@ public void testReplicatorWithPartitionedTopic() throws Exception { admin2.topics().updatePartitionedTopic(persistentTopicName, 5); assertEquals(admin2.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 5); assertEquals((int) admin2.topics().getList(namespace).stream().filter(topic -> - !topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 5); + !topic.contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 5); // Update partitioned topic from R3 admin3.topics().updatePartitionedTopic(persistentTopicName, 6); assertEquals(admin3.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 6); assertEquals(admin3.topics().getList(namespace).stream().filter(topic -> - !topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 6); + !topic.contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 6); // Update partitioned topic from R1 admin1.topics().updatePartitionedTopic(persistentTopicName, 7); assertEquals(admin1.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 7); assertEquals(admin2.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 7); assertEquals(admin3.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 7); assertEquals(admin1.topics().getList(namespace).stream().filter(topic -> - !topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 7); + !topic.contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 7); assertEquals(admin2.topics().getList(namespace).stream().filter(topic -> - !topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 7); + !topic.contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 7); assertEquals(admin3.topics().getList(namespace).stream().filter(topic -> - !topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 7); + !topic.contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 7); } /** @@ -1299,7 +1300,7 @@ public void testDoNotReplicateSystemTopic() throws Exception { String topic = TopicName.get("persistent", NamespaceName.get(namespace), "testDoesNotReplicateSystemTopic").toString(); String systemTopic = TopicName.get("persistent", NamespaceName.get(namespace), - EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT).toString(); + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT).toString(); admin1.topics().createNonPartitionedTopic(topic); // Replicator will not replicate System Topic other than topic policies initTransaction(2, admin1, pulsar1.getBrokerServiceUrl(), pulsar1); @@ -1326,8 +1327,12 @@ public void testDoNotReplicateSystemTopic() throws Exception { private void initTransaction(int coordinatorSize, PulsarAdmin admin, String ServiceUrl, PulsarService pulsarService) throws Exception { admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString(), coordinatorSize); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), coordinatorSize); - admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); + pulsarService.getPulsarResources() + .getNamespaceResources() + .getPartitionedTopicResources() + .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, + new PartitionedTopicMetadata(coordinatorSize)); + admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString()); PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(ServiceUrl).enableTransaction(true).build(); pulsarClient.close(); Awaitility.await().until(() -> @@ -1361,7 +1366,7 @@ private void checkListContainExpectedTopic(PulsarAdmin admin, String namespace, Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> { list.clear(); list.addAll(admin.topics().getList(namespace).stream() - .filter(topic -> !topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)) + .filter(topic -> !topic.contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)) .collect(Collectors.toList())); return list.size() == expectedTopicList.size(); }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java index e50a39c5dfd91..cd073737cece9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java @@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; - import com.google.common.collect.Sets; import java.util.concurrent.TimeoutException; import org.apache.bookkeeper.mledger.Position; @@ -39,7 +38,7 @@ import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.TxnAction; import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; @@ -66,8 +65,8 @@ protected void setup() throws Exception { super.baseSetup(configuration); admin.tenants().createTenant("pulsar", new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test"))); admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16); - admin.lookups().lookupPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); + createTransactionCoordinatorAssign(16); + admin.lookups().lookupPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString()); } @AfterMethod(alwaysRun = true) @@ -81,7 +80,7 @@ public void testAddAndRemoveTransactionMetadataStore() throws Exception { TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService(); Assert.assertNotNull(transactionMetadataStoreService); - admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString()); + admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString()); transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0)); Awaitility.await().until(() -> transactionMetadataStoreService.getStores().size() == 1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java index 98ac2618eeab8..6da6026b34fc2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java @@ -21,7 +21,6 @@ import java.util.List; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; - import com.google.common.collect.Sets; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; @@ -31,7 +30,6 @@ import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; @@ -99,7 +97,7 @@ public void testTransactionTopic() throws Exception { admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test"))); admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1); + createTransactionCoordinatorAssign(); ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); managedLedgerConfig.setMaxEntriesPerLedger(2); new MLTransactionLogImpl(TransactionCoordinatorID.get(0), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java index 0c9f877150a8e..668c9254b49ec 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java @@ -40,7 +40,7 @@ import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.api.proto.TxnAction; import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionSubscription; @@ -78,7 +78,7 @@ protected void setup() throws Exception { .allowedClusters(Sets.newHashSet("test")) .build()); admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1); + createTransactionCoordinatorAssign(); } @AfterMethod(alwaysRun = true) @@ -90,7 +90,7 @@ protected void cleanup() throws Exception { @Test public void testTransactionCoordinatorMetrics() throws Exception{ long timeout = 10000; - admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); + admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString()); TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get(0); TransactionCoordinatorID transactionCoordinatorIDTwo = TransactionCoordinatorID.get(1); pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne); @@ -127,7 +127,7 @@ public void testTransactionCoordinatorRateMetrics() throws Exception{ String topic = "persistent://" + ns1 + "/test_coordinator_metrics"; String subName = "test_coordinator_metrics"; TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get(0); - admin.lookups().lookupPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); + admin.lookups().lookupPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString()); pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne); admin.topics().createNonPartitionedTopic(topic); admin.topics().createSubscription(topic, subName, MessageId.earliest); @@ -229,7 +229,7 @@ public void testManagedLedgerMetrics() throws Exception{ String topic = "persistent://" + ns1 + "/test_managed_ledger_metrics"; String subName = "test_managed_ledger_metrics"; admin.topics().createNonPartitionedTopic(topic); - admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); + admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString()); TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get(0); pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne).get(); admin.topics().createSubscription(topic, subName, MessageId.earliest); @@ -291,7 +291,7 @@ public void testManagedLedgerMetricsWhenPendingAckNotInit() throws Exception{ String subName = "test_managed_ledger_metrics"; String subName2 = "test_pending_ack_no_init"; admin.topics().createNonPartitionedTopic(topic); - admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); + admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString()); TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get(0); pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne).get(); admin.topics().createSubscription(topic, subName, MessageId.earliest); @@ -352,7 +352,7 @@ public void testManagedLedgerMetricsWhenPendingAckNotInit() throws Exception{ @Test public void testDuplicateMetricTypeDefinitions() throws Exception{ - admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); + admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString()); TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get(0); TransactionCoordinatorID transactionCoordinatorIDTwo = TransactionCoordinatorID.get(1); pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java index b680d74b86bae..244d7587a42e1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java @@ -30,10 +30,10 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.events.ActionType; import org.apache.pulsar.common.events.EventType; -import org.apache.pulsar.common.events.EventsTopicNames; import org.apache.pulsar.common.events.PulsarEvent; import org.apache.pulsar.common.events.TopicPoliciesEvent; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; @@ -138,7 +138,7 @@ public void testSendAndReceiveNamespaceEvents() throws Exception { @Test(timeOut = 30000) public void checkSystemTopic() throws PulsarAdminException { - final String systemTopic = "persistent://" + NAMESPACE1 + "/" + EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME; + final String systemTopic = "persistent://" + NAMESPACE1 + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME; final String normalTopic = "persistent://" + NAMESPACE1 + "/normal_topic"; admin.topics().createPartitionedTopic(normalTopic, 3); TopicName systemTopicName = TopicName.get(systemTopic); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index d506f7127678f..8923d01d92b98 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -33,8 +33,8 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.common.events.EventsTopicNames; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.naming.TopicName; @@ -88,7 +88,7 @@ public void testAutoCreatedPartitionedSystemTopic() throws Exception { SystemTopicClient.Reader reader = systemTopicClientForNamespace.newReader(); int partitions = admin.topics().getPartitionedTopicMetadata( - String.format("persistent://%s/%s", ns, EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).partitions; + String.format("persistent://%s/%s", ns, SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).partitions; Assert.assertEquals(admin.topics().getPartitionedTopicList(ns).size(), 1); Assert.assertEquals(partitions, PARTITIONS); Assert.assertEquals(admin.topics().getList(ns).size(), PARTITIONS); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java index 78259c23b8a58..3bc31bbc4fca8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.broker.transaction; -import static org.apache.pulsar.common.events.EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT; +import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -67,7 +67,6 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.events.EventType; -import org.apache.pulsar.common.events.EventsTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.FutureUtil; @@ -252,7 +251,7 @@ private void testTakeSnapshot() throws IOException, ExecutionException, Interrup ReaderBuilder readerBuilder = pulsarClient .newReader(Schema.AVRO(TransactionBufferSnapshot.class)) .startMessageId(MessageId.earliest) - .topic(NAMESPACE1 + "/" + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT); + .topic(NAMESPACE1 + "/" + TRANSACTION_BUFFER_SNAPSHOT); Reader reader = readerBuilder.create(); MessageId messageId1 = producer.newMessage(tnx1).value("test").send(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java index 24e5c5aaf1e6d..32efdb51659b3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.transaction; import static java.nio.charset.StandardCharsets.UTF_8; - import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -45,7 +44,6 @@ import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.TxnAction; import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.protocol.Commands; @@ -82,7 +80,7 @@ public void setup() throws Exception { admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1); + createTransactionCoordinatorAssign(1); } @AfterMethod(alwaysRun = true) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 61d69661d6278..60ecf04e60213 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -19,7 +19,7 @@ package org.apache.pulsar.broker.transaction; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore.PENDING_ACK_STORE_SUFFIX; +import static org.apache.pulsar.common.naming.SystemTopicNames.PENDING_ACK_STORE_SUFFIX; import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -32,7 +32,6 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - import io.netty.buffer.Unpooled; import io.netty.util.Timeout; import java.lang.reflect.Field; @@ -93,8 +92,8 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.events.EventType; -import org.apache.pulsar.common.events.EventsTopicNames; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; @@ -230,7 +229,7 @@ public void brokerNotInitTxnManagedLedgerTopic() throws Exception { Assert.assertNull(topics.get(TopicName.get(TopicDomain.persistent.value(), NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString() + 0)); - Assert.assertNull(topics.get(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString())); + Assert.assertNull(topics.get(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString())); Assert.assertNull(topics.get(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName))); } @@ -347,7 +346,7 @@ public void testTakeSnapshotBeforeBuildTxnProducer() throws Exception { ReaderBuilder readerBuilder = pulsarClient .newReader(Schema.AVRO(TransactionBufferSnapshot.class)) .startMessageId(MessageId.earliest) - .topic(NAMESPACE1 + "/" + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT); + .topic(NAMESPACE1 + "/" + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT); Reader reader = readerBuilder.create(); long waitSnapShotTime = getPulsarServiceList().get(0).getConfiguration() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index bdcde1be8141c..dc034daa061ba 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -50,9 +50,11 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.tests.TestRetrySupport; @@ -123,7 +125,7 @@ protected void setUpBase(int numBroker,int numPartitionsOfTC, String topic, int admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), numPartitionsOfTC); + createTransactionCoordinatorAssign(numPartitionsOfTC); if (topic != null) { admin.tenants().createTenant(TENANT, new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); @@ -144,6 +146,14 @@ protected void setUpBase(int numBroker,int numPartitionsOfTC, String topic, int .build(); } + protected void createTransactionCoordinatorAssign(int numPartitionsOfTC) throws MetadataStoreException { + pulsarServiceList.get(0).getPulsarResources() + .getNamespaceResources() + .getPartitionedTopicResources() + .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, + new PartitionedTopicMetadata(numPartitionsOfTC)); + } + protected void startBroker() throws Exception { for (int i = 0; i < brokerCount; i++) { conf.setClusterName(CLUSTER_NAME); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java index 43d31e7f9ffcd..77ac464c1d497 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferCloseTest.java @@ -26,8 +26,8 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient; import org.apache.pulsar.client.impl.PulsarClientImpl; -import org.apache.pulsar.common.events.EventsTopicNames; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.PublisherStats; @@ -38,7 +38,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; - import java.util.List; import java.util.concurrent.TimeUnit; @@ -110,7 +109,7 @@ private TopicName createAndLoadTopic(boolean isPartition, int partitionCount) private void checkSnapshotPublisherCount(String namespace, int expectCount) throws PulsarAdminException { TopicName snTopicName = TopicName.get(TopicDomain.persistent.value(), NamespaceName.get(namespace), - EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT); + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT); List publisherStatsList = (List) admin.topics() .getStats(snTopicName.getPartitionedTopicName()).getPublishers(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java index 873509ff6bf97..192353a2da049 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java @@ -31,7 +31,6 @@ import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; - import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.collections4.map.LinkedMap; import org.apache.pulsar.broker.service.BrokerService; @@ -49,9 +48,8 @@ import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.transaction.TransactionImpl; -import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; - import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; @@ -144,7 +142,7 @@ public void testTransactionBufferLowWaterMark() throws Exception { PartitionedTopicMetadata partitionedTopicMetadata = ((PulsarClientImpl) pulsarClient).getLookup() - .getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN).get(); + .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN).get(); Transaction lowWaterMarkTxn = null; for (int i = 0; i < partitionedTopicMetadata.partitions; i++) { lowWaterMarkTxn = pulsarClient.newTransaction() @@ -249,7 +247,7 @@ public void testPendingAckLowWaterMark() throws Exception { PartitionedTopicMetadata partitionedTopicMetadata = ((PulsarClientImpl) pulsarClient).getLookup() - .getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN).get(); + .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN).get(); Transaction lowWaterMarkTxn = null; for (int i = 0; i < partitionedTopicMetadata.partitions; i++) { lowWaterMarkTxn = pulsarClient.newTransaction() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java index 01027868e272b..c95becc2b3d45 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java @@ -24,7 +24,7 @@ import org.apache.pulsar.broker.transaction.TransactionTestBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.ServiceUrlProvider; -import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.awaitility.Awaitility; import org.testng.Assert; @@ -81,7 +81,7 @@ public void testTransactionMetaStoreUnload() throws Exception { // close pulsar client will not init tc again pulsarClient.close(); - admin.topics().unload(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); + admin.topics().unload(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString()); for (int i = 0; i < 16; i++) { final int f = i; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java index 432fc6710718c..54a5e07405e83 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java @@ -36,6 +36,7 @@ import org.apache.pulsar.client.util.MathUtils; import org.apache.pulsar.common.api.proto.Subscription; import org.apache.pulsar.common.api.proto.TxnAction; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; @@ -78,7 +79,7 @@ public void start() throws TransactionCoordinatorClientException { @Override public CompletableFuture startAsync() { if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) { - return pulsarClient.getLookup().getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN) + return pulsarClient.getLookup().getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN) .thenCompose(partitionMeta -> { List> connectFutureList = new ArrayList<>(); if (LOG.isDebugEnabled()) { @@ -116,9 +117,10 @@ public CompletableFuture startAsync() { private String getTCAssignTopicName(int partition) { if (partition >= 0) { - return TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString() + TopicName.PARTITIONED_TOPIC_SUFFIX + partition; + return SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString() + + TopicName.PARTITIONED_TOPIC_SUFFIX + partition; } else { - return TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(); + return SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString(); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java similarity index 52% rename from pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java index 7ed6308ec04be..eaab826146000 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java @@ -16,17 +16,16 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.common.events; +package org.apache.pulsar.common.naming; import com.google.common.collect.Sets; import java.util.Collections; import java.util.Set; -import org.apache.pulsar.common.naming.TopicName; /** - * System topic names for each {@link EventType}. + * Encapsulate the parsing of the completeTopicName name. */ -public class EventsTopicNames { +public class SystemTopicNames { /** * Local topic name for the namespace events. @@ -38,19 +37,34 @@ public class EventsTopicNames { */ public static final String TRANSACTION_BUFFER_SNAPSHOT = "__transaction_buffer_snapshot"; + + public static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack"; + + public static final String PENDING_ACK_STORE_CURSOR_NAME = "__pending_ack_state"; + /** * The set of all local topic names declared above. */ public static final Set EVENTS_TOPIC_NAMES = Collections.unmodifiableSet(Sets.newHashSet(NAMESPACE_EVENTS_LOCAL_NAME, TRANSACTION_BUFFER_SNAPSHOT)); - public static boolean checkTopicIsEventsNames(TopicName topicName) { + + public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(), + NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign"); + + public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(), + NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_"); + + public static final TopicName RESOURCE_USAGE_TOPIC = TopicName.get(TopicDomain.non_persistent.value(), + NamespaceName.SYSTEM_NAMESPACE, "resource-usage"); + + public static boolean isEventSystemTopic(TopicName topicName) { return EVENTS_TOPIC_NAMES.contains(TopicName.get(topicName.getPartitionedTopicName()).getLocalName()); } - public static boolean checkTopicIsTransactionCoordinatorAssign(TopicName topicName) { + public static boolean isTransactionCoordinatorAssign(TopicName topicName) { return topicName != null && topicName.toString() - .startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); + .startsWith(TRANSACTION_COORDINATOR_ASSIGN.toString()); } public static boolean isTopicPoliciesSystemTopic(String topic) { @@ -59,4 +73,16 @@ public static boolean isTopicPoliciesSystemTopic(String topic) { } return TopicName.get(topic).getLocalName().equals(NAMESPACE_EVENTS_LOCAL_NAME); } + + public static boolean isTransactionInternalName(TopicName topicName) { + String topic = topicName.toString(); + return topic.startsWith(TRANSACTION_COORDINATOR_ASSIGN.toString()) + || topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString()) + || topic.endsWith(PENDING_ACK_STORE_SUFFIX); + } + + public static boolean isSystemTopic(TopicName topicName) { + TopicName nonePartitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); + return isEventSystemTopic(nonePartitionedTopicName) || isTransactionInternalName(nonePartitionedTopicName); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 97f99b19f19cc..5f03c60ece906 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -60,12 +60,6 @@ public TopicName load(String name) throws Exception { } }); - public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(), - NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign"); - - public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(), - NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_"); - public static TopicName get(String domain, NamespaceName namespaceName, String topic) { String name = domain + "://" + namespaceName.toString() + '/' + topic; return TopicName.get(name); diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/Oauth2PerformanceTransactionTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/Oauth2PerformanceTransactionTest.java index e4cc0bf23e6c0..1cf0d7da35437 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/Oauth2PerformanceTransactionTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/Oauth2PerformanceTransactionTest.java @@ -44,7 +44,8 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TokenOauth2AuthenticatedProducerConsumerTest; import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -146,7 +147,11 @@ protected final void clientSetup() throws Exception { new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test"))); admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1); + pulsar.getPulsarResources() + .getNamespaceResources() + .getPartitionedTopicResources() + .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, + new PartitionedTopicMetadata(1)); replacePulsarClient(PulsarClient.builder().serviceUrl(new URI(pulsar.getBrokerServiceUrl()).toString()) .statsInterval(0, TimeUnit.SECONDS) diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java index 936f70bedf360..4ef3bba46c858 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java @@ -33,14 +33,14 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; - import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -74,7 +74,9 @@ protected void setup() throws Exception { new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test"))); admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1); + pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, + new PartitionedTopicMetadata(1)); } @AfterMethod(alwaysRun = true)