From 49f58e5ac323545b0d996308efba035c18e5b706 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Wed, 18 Oct 2017 11:06:25 -0700 Subject: [PATCH 1/5] Introduce Failure-domain and Anti-affinity-namespace group --- conf/broker.conf | 3 + conf/standalone.conf | 3 + .../pulsar/broker/ServiceConfiguration.java | 11 + .../cache/ConfigurationCacheService.java | 50 ++ .../apache/pulsar/broker/PulsarService.java | 2 +- .../pulsar/broker/admin/AdminResource.java | 24 +- .../apache/pulsar/broker/admin/Clusters.java | 208 ++++++- .../pulsar/broker/admin/Namespaces.java | 140 ++++- .../loadbalance/impl/LoadManagerShared.java | 242 +++++++- .../impl/ModularLoadManagerImpl.java | 104 +++- .../impl/SimpleLoadManagerImpl.java | 2 +- .../pulsar/broker/web/PulsarWebResource.java | 2 +- .../pulsar/broker/SLAMonitoringTest.java | 2 +- .../pulsar/broker/admin/AdminApiTest.java | 16 +- .../pulsar/broker/admin/AdminApiTest2.java | 71 ++- .../apache/pulsar/broker/admin/AdminTest.java | 11 +- .../pulsar/broker/admin/NamespacesTest.java | 2 +- .../pulsar/broker/auth/AuthorizationTest.java | 2 +- .../auth/MockedPulsarServiceBaseTest.java | 3 +- .../AntiAffinityNamespaceGroupTest.java | 540 ++++++++++++++++++ .../ModularLoadManagerImplTest.java | 14 +- .../service/BacklogQuotaManagerTest.java | 2 +- .../broker/service/ReplicatorTestBase.java | 6 +- .../ZooKeeperSessionExpireRecoveryTest.java | 8 +- .../AuthenticatedProducerConsumerTest.java | 13 +- .../client/api/NonPersistentTopicTest.java | 9 +- .../client/api/TlsProducerConsumerBase.java | 5 +- .../proxy/ProxyAuthorizationTest.java | 7 +- .../apache/pulsar/client/admin/Clusters.java | 132 +++++ .../pulsar/client/admin/Namespaces.java | 72 +++ .../client/admin/internal/ClustersImpl.java | 46 ++ .../client/admin/internal/NamespacesImpl.java | 49 ++ .../apache/pulsar/admin/cli/CmdClusters.java | 85 +++ .../pulsar/admin/cli/CmdNamespaces.java | 64 +++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 32 ++ .../common/policies/data/FailureDomain.java | 52 ++ .../pulsar/common/policies/data/Policies.java | 5 +- ...roxyAuthenticatedProducerConsumerTest.java | 5 +- 38 files changed, 1955 insertions(+), 89 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FailureDomain.java diff --git a/conf/broker.conf b/conf/broker.conf index 20da0a9104b0d..28e4296bd38f3 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -46,6 +46,9 @@ advertisedAddress= # Name of the cluster to which this broker belongs to clusterName= +# Enable cluster's failure-domain which can distribute brokers into logical region +failureDomainsEnabled=false + # Zookeeper session timeout in milliseconds zooKeeperSessionTimeoutMillis=30000 diff --git a/conf/standalone.conf b/conf/standalone.conf index ce35f2ebdb71c..3dbd8bbce8e3f 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -39,6 +39,9 @@ advertisedAddress= # Name of the cluster to which this broker belongs to clusterName=standalone +# Enable cluster's failure-domain which can distribute brokers into logical region +failureDomainsEnabled=false + # Zookeeper session timeout in milliseconds zooKeeperSessionTimeoutMillis=30000 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 069933cf1df91..e4d6c2fe91603 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -61,6 +61,9 @@ public class ServiceConfiguration implements PulsarConfiguration { // Name of the cluster to which this broker belongs to @FieldContext(required = true) private String clusterName; + // Enable cluster's failure-domain which can distribute brokers into logical region + @FieldContext(dynamic = true) + private boolean failureDomainsEnabled = false; // Zookeeper session timeout in milliseconds private long zooKeeperSessionTimeoutMillis = 30000; // Time to wait for broker graceful shutdown. After this time elapses, the @@ -468,6 +471,14 @@ public void setClusterName(String clusterName) { this.clusterName = clusterName; } + public boolean isFailureDomainsEnabled() { + return failureDomainsEnabled; + } + + public void setFailureDomainsEnabled(boolean failureDomainsEnabled) { + this.failureDomainsEnabled = failureDomainsEnabled; + } + public long getBrokerShutdownTimeoutMs() { return brokerShutdownTimeoutMs; } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java index 1c8ad8099dc28..41a8f932cca8a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java @@ -21,8 +21,10 @@ import java.util.Map; import org.apache.bookkeeper.util.ZkUtils; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PropertyAdmin; @@ -39,6 +41,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.Maps; /** * ConfigurationCacheService maintains a local in-memory cache of all the configurations and policies stored in @@ -53,13 +56,21 @@ public class ConfigurationCacheService { private ZooKeeperDataCache policiesCache; private ZooKeeperDataCache clustersCache; private ZooKeeperChildrenCache clustersListCache; + private ZooKeeperChildrenCache failureDomainListCache; private ZooKeeperDataCache namespaceIsolationPoliciesCache; + private ZooKeeperDataCache failureDomainCache; public static final String POLICIES = "policies"; + public static final String FAILURE_DOMAIN = "failureDomain"; + public final String CLUSTER_FAILURE_DOMAIN_ROOT; public static final String POLICIES_ROOT = "/admin/policies"; private static final String CLUSTERS_ROOT = "/admin/clusters"; public ConfigurationCacheService(ZooKeeperCache cache) throws PulsarServerException { + this(cache, null); + } + + public ConfigurationCacheService(ZooKeeperCache cache, String configuredClusterName) throws PulsarServerException { this.cache = cache; initZK(); @@ -86,6 +97,12 @@ public ClusterData deserialize(String path, byte[] content) throws Exception { }; this.clustersListCache = new ZooKeeperChildrenCache(cache, CLUSTERS_ROOT); + + CLUSTER_FAILURE_DOMAIN_ROOT = CLUSTERS_ROOT + "/" + configuredClusterName + "/" + FAILURE_DOMAIN; + if (isNotBlank(configuredClusterName)) { + createFailureDomainRoot(cache.getZooKeeper(), CLUSTER_FAILURE_DOMAIN_ROOT); + this.failureDomainListCache = new ZooKeeperChildrenCache(cache, CLUSTER_FAILURE_DOMAIN_ROOT); + } this.namespaceIsolationPoliciesCache = new ZooKeeperDataCache(cache) { @Override @@ -96,6 +113,31 @@ public NamespaceIsolationPolicies deserialize(String path, byte[] content) throw })); } }; + + this.failureDomainCache = new ZooKeeperDataCache(cache) { + @Override + public FailureDomain deserialize(String path, byte[] content) throws Exception { + return ObjectMapperFactory.getThreadLocal().readValue(content, FailureDomain.class); + } + }; + } + + private void createFailureDomainRoot(ZooKeeper zk, String path) { + try { + if (zk.exists(path, false) == null) { + try { + byte[] data = "".getBytes(); + ZkUtils.createFullPathOptimistic(zk, path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + LOG.info("Successfully created failure-domain znode at {}", path); + } catch (KeeperException.NodeExistsException e) { + // Ok + } + } + } catch (KeeperException.NodeExistsException e) { + // Ok + } catch (Exception e) { + LOG.warn("Failed to create failure-domain znode {} ", path, e); + } } private void initZK() throws PulsarServerException { @@ -138,6 +180,10 @@ public ZooKeeperChildrenCache clustersListCache() { return this.clustersListCache; } + public ZooKeeperChildrenCache failureDomainListCache() { + return this.failureDomainListCache; + } + public ZooKeeper getZooKeeper() { return this.cache.getZooKeeper(); } @@ -145,4 +191,8 @@ public ZooKeeper getZooKeeper() { public ZooKeeperDataCache namespaceIsolationPoliciesCache() { return this.namespaceIsolationPoliciesCache; } + + public ZooKeeperDataCache failureDomainCache() { + return this.failureDomainCache; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 2d7f99b6f50ba..6cd56abccd3d3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -417,7 +417,7 @@ private void startZkCacheService() throws PulsarServerException { throw new PulsarServerException(e); } - this.configurationCacheService = new ConfigurationCacheService(getGlobalZkCache()); + this.configurationCacheService = new ConfigurationCacheService(getGlobalZkCache(), this.config.getClusterName()); this.localZkCacheService = new LocalZooKeeperCacheService(getLocalZkCache(), this.configurationCacheService); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index f542a10f03c1a..f04d15bb63d2e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.admin; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; +import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT; import java.net.MalformedURLException; import java.net.URI; @@ -45,6 +47,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PropertyAdmin; @@ -64,7 +67,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; -import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; public abstract class AdminResource extends PulsarWebResource { private static final Logger log = LoggerFactory.getLogger(AdminResource.class); @@ -202,6 +204,7 @@ protected List getListOfNamespaces(String property) throws Exception { return namespaces; } + /** * Redirect the call to the specified broker * @@ -286,6 +289,14 @@ protected ZooKeeperDataCache namespaceIsolationPolic return pulsar().getConfigurationCache().namespaceIsolationPoliciesCache(); } + protected ZooKeeperDataCache failureDomainCache() { + return pulsar().getConfigurationCache().failureDomainCache(); + } + + protected ZooKeeperChildrenCache failureDomainListCache() { + return pulsar().getConfigurationCache().failureDomainListCache(); + } + protected PartitionedTopicMetadata getPartitionedTopicMetadata(String property, String cluster, String namespace, String destination, boolean authoritative) { DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination); @@ -354,5 +365,14 @@ public PartitionedTopicMetadata deserialize(String key, byte[] content) throws E } return metadataFuture; } - + + protected void validateClusterExists(String cluster) { + try { + if (!clustersCache().get(path("clusters", cluster)).isPresent()) { + throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist."); + } + } catch (Exception e) { + throw new RestException(e); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Clusters.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Clusters.java index b489f5ff98877..d00826a20c709 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Clusters.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Clusters.java @@ -23,9 +23,11 @@ import java.io.IOException; import java.util.Collections; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; import javax.ws.rs.DELETE; import javax.ws.rs.GET; @@ -38,14 +40,17 @@ import javax.ws.rs.core.Response.Status; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; @@ -53,12 +58,15 @@ import com.fasterxml.jackson.core.JsonGenerationException; import com.fasterxml.jackson.databind.JsonMappingException; +import com.google.common.collect.Maps; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; +import static org.apache.pulsar.broker.cache.ConfigurationCacheService.FAILURE_DOMAIN; + @Path("/clusters") @Api(value = "/clusters", description = "Cluster admin apis", tags = "clusters") @Produces(MediaType.APPLICATION_JSON) @@ -259,6 +267,7 @@ public void deleteCluster(@PathParam("cluster") String cluster) { try { String clusterPath = path("clusters", cluster); + deleteFailureDomain(clusterPath); globalZk().delete(clusterPath, -1); globalZkCache().invalidate(clusterPath); log.info("[{}] Deleted cluster {}", clientAppId(), cluster); @@ -271,6 +280,25 @@ public void deleteCluster(@PathParam("cluster") String cluster) { } } + private void deleteFailureDomain(String clusterPath) { + try { + String failureDomain = joinPath(clusterPath, ConfigurationCacheService.FAILURE_DOMAIN); + if (globalZk().exists(failureDomain, false) == null) { + return; + } + for (String domain : globalZk().getChildren(failureDomain, false)) { + String domainPath = joinPath(failureDomain, domain); + globalZk().delete(domainPath, -1); + } + globalZk().delete(failureDomain, -1); + failureDomainCache().clear(); + failureDomainListCache().clear(); + } catch (Exception e) { + log.warn("Failed to delete failure-domain under cluster {}", clusterPath); + throw new RestException(e); + } + } + @GET @Path("/{cluster}/namespaceIsolationPolicies") @ApiOperation(value = "Get the namespace isolation policies assigned in the cluster", response = NamespaceIsolationData.class, responseContainer = "Map") @@ -296,16 +324,6 @@ public Map getNamespaceIsolationPolicies(@PathPa } } - private void validateClusterExists(String cluster) { - try { - if (!clustersCache().get(path("clusters", cluster)).isPresent()) { - throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist."); - } - } catch (Exception e) { - throw new RestException(e); - } - } - @GET @Path("/{cluster}/namespaceIsolationPolicies/{policyName}") @ApiOperation(value = "Get a single namespace isolation policy assigned in the cluster", response = NamespaceIsolationData.class) @@ -356,7 +374,7 @@ public void setNamespaceIsolationPolicy(@PathParam("cluster") String cluster, NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache() .get(nsIsolationPolicyPath).orElseGet(() -> { try { - this.createNamespaceIsolationPolicyNode(nsIsolationPolicyPath); + this.createZnodeIfNotExist(nsIsolationPolicyPath, Optional.of(Collections.emptyMap())); return new NamespaceIsolationPolicies(); } catch (KeeperException | InterruptedException e) { throw new RestException(e); @@ -386,23 +404,26 @@ public void setNamespaceIsolationPolicy(@PathParam("cluster") String cluster, } } - private void createNamespaceIsolationPolicyNode(String nsIsolationPolicyPath) - throws KeeperException, InterruptedException { + private boolean createZnodeIfNotExist(String path, Optional value) throws KeeperException, InterruptedException { // create persistent node on ZooKeeper - if (globalZk().exists(nsIsolationPolicyPath, false) == null) { + if (globalZk().exists(path, false) == null) { // create all the intermediate nodes try { - ZkUtils.createFullPathOptimistic(globalZk(), nsIsolationPolicyPath, - jsonMapper().writeValueAsBytes(Collections.emptyMap()), Ids.OPEN_ACL_UNSAFE, + ZkUtils.createFullPathOptimistic(globalZk(), path, + value.isPresent() ? jsonMapper().writeValueAsBytes(value.get()) : null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + return true; } catch (KeeperException.NodeExistsException nee) { - log.debug("Other broker preempted the full path [{}] already. Continue...", nsIsolationPolicyPath); + if(log.isDebugEnabled()) { + log.debug("Other broker preempted the full path [{}] already. Continue...", path); + } } catch (JsonGenerationException e) { // ignore json error as it is empty hash } catch (JsonMappingException e) { } catch (IOException e) { } } + return false; } @DELETE @@ -422,7 +443,7 @@ public void deleteNamespaceIsolationPolicy(@PathParam("cluster") String cluster, NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache() .get(nsIsolationPolicyPath).orElseGet(() -> { try { - this.createNamespaceIsolationPolicyNode(nsIsolationPolicyPath); + this.createZnodeIfNotExist(nsIsolationPolicyPath, Optional.of(Collections.emptyMap())); return new NamespaceIsolationPolicies(); } catch (KeeperException | InterruptedException e) { throw new RestException(e); @@ -446,6 +467,157 @@ public void deleteNamespaceIsolationPolicy(@PathParam("cluster") String cluster, } } + @POST + @Path("/{cluster}/failureDomains/{domainName}") + @ApiOperation(value = "Set cluster's failure Domain") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 409, message = "Broker already exist into other domain"), + @ApiResponse(code = 404, message = "Cluster doesn't exist") }) + public void setFailureDomain(@PathParam("cluster") String cluster, @PathParam("domainName") String domainName, + FailureDomain domain) throws Exception { + validateSuperUserAccess(); + validateClusterExists(cluster); + validateBrokerExistsInOtherDomain(cluster, domainName, domain); + + try { + String domainPath = joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT, domainName); + if (this.createZnodeIfNotExist(domainPath, Optional.ofNullable(domain))) { + // clear domains-children cache + this.failureDomainListCache().clear(); + } else { + globalZk().setData(domainPath, jsonMapper().writeValueAsBytes(domain), -1); + // make sure that the domain-cache will be refreshed for the next read access + failureDomainCache().invalidate(domainPath); + } + } catch (KeeperException.NoNodeException nne) { + log.warn("[{}] Failed to update domain {}. clusters {} Does not exist", clientAppId(), cluster, + domainName); + throw new RestException(Status.NOT_FOUND, + "Domain " + domainName + " for cluster " + cluster + " does not exist"); + } catch (Exception e) { + log.error("[{}] Failed to update clusters/{}/domainName/{}", clientAppId(), cluster, domainName, e); + throw new RestException(e); + } + } + + @GET + @Path("/{cluster}/failureDomains") + @ApiOperation(value = "Get the cluster failure domains", response = FailureDomain.class, responseContainer = "Map") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") }) + public Map getFailureDomains(@PathParam("cluster") String cluster) throws Exception { + validateSuperUserAccess(); + + Map domains = Maps.newHashMap(); + try { + final String failureDomainRootPath = pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT; + for (String domainName : failureDomainListCache().get()) { + try { + Optional domain = failureDomainCache() + .get(joinPath(failureDomainRootPath, domainName)); + if (domain.isPresent()) { + domains.put(domainName, domain.get()); + } + } catch (Exception e) { + log.warn("Failed to get domain {}", domainName, e); + } + } + } catch (KeeperException.NoNodeException e) { + log.warn("[{}] Failure-domain is not configured for cluster {}", clientAppId(), cluster, e); + return Collections.emptyMap(); + } catch (Exception e) { + log.error("[{}] Failed to get failure-domains for cluster {}", clientAppId(), cluster, e); + throw new RestException(e); + } + return domains; + } + + @GET + @Path("/{cluster}/failureDomains/{domainName}") + @ApiOperation(value = "Get a domain in a cluster", response = FailureDomain.class) + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Domain doesn't exist"), + @ApiResponse(code = 412, message = "Cluster doesn't exist") }) + public FailureDomain getDomain(@PathParam("cluster") String cluster, @PathParam("domainName") String domainName) + throws Exception { + validateSuperUserAccess(); + validateClusterExists(cluster); + + try { + final String failureDomainRootPath = pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT; + return failureDomainCache().get(joinPath(failureDomainRootPath, domainName)) + .orElseThrow(() -> new RestException(Status.NOT_FOUND, + "Domain " + domainName + " for cluster " + cluster + " does not exist")); + } catch (RestException re) { + throw re; + } catch (Exception e) { + log.error("[{}] Failed to get domain {} for cluster {}", clientAppId(), domainName, cluster, e); + throw new RestException(e); + } + } + + @DELETE + @Path("/{cluster}/failureDomains/{domainName}") + @ApiOperation(value = "Delete cluster's failure omain") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission or plicy is read only"), + @ApiResponse(code = 412, message = "Cluster doesn't exist") }) + public void deleteFailureDomain(@PathParam("cluster") String cluster, @PathParam("domainName") String domainName) + throws Exception { + validateSuperUserAccess(); + validateClusterExists(cluster); + + try { + final String domainPath = joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT, domainName); + globalZk().delete(domainPath, -1); + // clear domain cache + failureDomainCache().invalidate(domainPath); + failureDomainListCache().clear(); + } catch (KeeperException.NoNodeException nne) { + log.warn("[{}] Domain {} does not exist in {}", clientAppId(), domainName, cluster); + throw new RestException(Status.NOT_FOUND, + "Domain-name " + domainName + " or cluster " + cluster + " does not exist"); + } catch (Exception e) { + log.error("[{}] Failed to delete domain {} in cluster {}", clientAppId(), domainName, cluster, e); + throw new RestException(e); + } + } + + private void validateBrokerExistsInOtherDomain(final String cluster, final String inputDomainName, + final FailureDomain inputDomain) { + if (inputDomain != null && inputDomain.brokers != null) { + try { + final String failureDomainRootPath = pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT; + for (String domainName : failureDomainListCache().get()) { + if (inputDomainName.equals(domainName)) { + continue; + } + try { + Optional domain = failureDomainCache().get(joinPath(failureDomainRootPath, domainName)); + if (domain.isPresent() && domain.get().brokers != null) { + List duplicateBrokers = domain.get().brokers.stream().parallel() + .filter(inputDomain.brokers::contains).collect(Collectors.toList()); + if (!duplicateBrokers.isEmpty()) { + throw new RestException(Status.CONFLICT, + duplicateBrokers + " already exist into " + domainName); + } + } + } catch (Exception e) { + if (e instanceof RestException) { + throw e; + } + log.warn("Failed to get domain {}", domainName, e); + } + } + } catch (KeeperException.NoNodeException e) { + if (log.isDebugEnabled()) { + log.debug("[{}] Domain is not configured for cluster", clientAppId(), e); + } + } catch (Exception e) { + log.error("[{}] Failed to get domains for cluster {}", clientAppId(), e); + throw new RestException(e); + } + } + } + private static final Logger log = LoggerFactory.getLogger(Clusters.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java index cb8cf72a8dcd4..cb8f982d8a21b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java @@ -18,8 +18,11 @@ */ package org.apache.pulsar.broker.admin; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; +import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT; import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT; import java.net.URI; @@ -91,8 +94,6 @@ import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; -import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; - @Path("/namespaces") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) @@ -668,6 +669,139 @@ public void setNamespaceMessageTTL(@PathParam("property") String property, @Path } } + @POST + @Path("/{property}/{cluster}/{namespace}/antiAffinity") + @ApiOperation(value = "Set anti-affinity group for a namespace") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), + @ApiResponse(code = 412, message = "Invalid antiAffinityGroup") }) + public void setNamespaceAntiAffinityGroup(@PathParam("property") String property, @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, String antiAffinityGroup) { + validateAdminAccessOnProperty(property); + validatePoliciesReadOnlyAccess(); + + log.info("[{}] Setting anti-affinity group {} for {}/{}/{}", clientAppId(), antiAffinityGroup, property, + cluster, namespace); + + if (isBlank(antiAffinityGroup)) { + throw new RestException(Status.PRECONDITION_FAILED, "antiAffinityGroup can't be empty"); + } + + NamespaceName nsName = NamespaceName.get(property, cluster, namespace); + Entry policiesNode = null; + + try { + // Force to read the data s.t. the watch to the cache content is setup. + policiesNode = policiesCache().getWithStat(path(POLICIES, property, cluster, namespace)) + .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace " + nsName + " does not exist")); + policiesNode.getKey().antiAffinityGroup = antiAffinityGroup; + + // Write back the new policies into zookeeper + globalZk().setData(path(POLICIES, property, cluster, namespace), + jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion()); + policiesCache().invalidate(path(POLICIES, property, cluster, namespace)); + + log.info("[{}] Successfully updated the antiAffinityGroup {} on namespace {}/{}/{}", clientAppId(), + antiAffinityGroup, property, cluster, namespace); + } catch (KeeperException.NoNodeException e) { + log.warn("[{}] Failed to update the antiAffinityGroup for namespace {}/{}/{}: does not exist", clientAppId(), + property, cluster, namespace); + throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); + } catch (KeeperException.BadVersionException e) { + log.warn( + "[{}] Failed to update the antiAffinityGroup on namespace {}/{}/{} expected policy node version={} : concurrent modification", + clientAppId(), property, cluster, namespace, policiesNode.getValue().getVersion()); + + throw new RestException(Status.CONFLICT, "Concurrent modification"); + } catch (Exception e) { + log.error("[{}] Failed to update the antiAffinityGroup on namespace {}/{}/{}", clientAppId(), property, cluster, + namespace, e); + throw new RestException(e); + } + } + + @GET + @Path("/{property}/{cluster}/{namespace}/antiAffinity") + @ApiOperation(value = "Get anti-affinity group of a namespace.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") }) + public String getNamespaceAntiAffinityGroup(@PathParam("property") String property, @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace) { + validateAdminAccessOnProperty(property); + return getNamespacePolicies(property, cluster, namespace).antiAffinityGroup; + } + + @GET + @Path("{cluster}/antiAffinity/{group}") + @ApiOperation(value = "Get all namespaces that are grouped by given anti-affinity group in a given cluster. api can be only accessed by admin of any of the existing property") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 412, message = "Cluster not exist/Anti-affinity group can't be empty.") }) + public List getAntiAffinityNamespaces(@PathParam("cluster") String cluster, + @PathParam("group") String antiAffinityGroup, @QueryParam("property") String property) { + validateAdminAccessOnProperty(property); + + log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(), property, antiAffinityGroup, cluster); + + if (isBlank(antiAffinityGroup)) { + throw new RestException(Status.PRECONDITION_FAILED, "anti-affinity group can't be empty."); + } + validateClusterExists(cluster); + List namespaces = Lists.newArrayList(); + try { + for (String prop : globalZk().getChildren(POLICIES_ROOT, false)) { + for (String namespace : globalZk().getChildren(path(POLICIES, prop, cluster), false)) { + Optional policies = policiesCache() + .get(AdminResource.path(POLICIES, prop, cluster, namespace)); + if (policies.isPresent() && antiAffinityGroup.equalsIgnoreCase(policies.get().antiAffinityGroup)) { + namespaces.add(String.format("%s/%s/%s", prop, cluster, namespace)); + } + } + } + } catch (Exception e) { + log.warn("Failed to list of properties/namespace from global-zk", e); + } + return namespaces; + } + + @DELETE + @Path("/{property}/{cluster}/{namespace}/antiAffinity") + @ApiOperation(value = "Remove anti-affinity group of a namespace.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace does not exist"), + @ApiResponse(code = 409, message = "Concurrent modification") }) + public void removeNamespaceAntiAffinityGroup(@PathParam("property") String property, + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { + validateAdminAccessOnProperty(property); + validatePoliciesReadOnlyAccess(); + + log.info("[{}] Deleting anti-affinity group for {}/{}/{}", clientAppId(), property, cluster, namespace); + + try { + Stat nodeStat = new Stat(); + final String path = path(POLICIES, property, cluster, namespace); + byte[] content = globalZk().getData(path, null, nodeStat); + Policies policies = jsonMapper().readValue(content, Policies.class); + policies.antiAffinityGroup = null; + globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion()); + policiesCache().invalidate(path(POLICIES, property, cluster, namespace)); + log.info("[{}] Successfully removed anti-affinity group for a namespace={}/{}/{}", clientAppId(), property, + cluster, namespace); + + } catch (KeeperException.NoNodeException e) { + log.warn("[{}] Failed to remove anti-affinity group for namespace {}/{}/{}: does not exist", clientAppId(), + property, cluster, namespace); + throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); + } catch (KeeperException.BadVersionException e) { + log.warn("[{}] Failed to remove anti-affinity group for namespace {}/{}/{}: concurrent modification", + clientAppId(), property, cluster, namespace); + throw new RestException(Status.CONFLICT, "Concurrent modification"); + } catch (Exception e) { + log.error("[{}] Failed to remove anti-affinity group for namespace {}/{}/{}", clientAppId(), property, + cluster, namespace, e); + throw new RestException(e); + } + } + @POST @Path("/{property}/{cluster}/{namespace}/deduplication") @ApiOperation(value = "Enable or disable broker side deduplication for all topics in a namespace") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java index 1a2d636b86849..ffdaac64f8e7c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java @@ -19,29 +19,44 @@ package org.apache.pulsar.broker.loadbalance.impl; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; +import static org.apache.pulsar.broker.web.PulsarWebResource.path; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; - import org.apache.pulsar.broker.BrokerData; +import java.util.concurrent.TimeoutException; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.loadbalance.BrokerHostUsage; import org.apache.pulsar.broker.loadbalance.LoadData; import org.apache.pulsar.broker.stats.metrics.JvmMetrics; +import org.apache.pulsar.client.util.FutureUtil; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; +import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; +import org.apache.pulsar.zookeeper.ZooKeeperDataCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.beust.jcommander.internal.Lists; +import com.google.common.collect.Maps; + /** * This class contains code which in shared between the two load manager implementations. */ @@ -60,13 +75,15 @@ public class LoadManagerShared { // update LoadReport at most every 5 seconds public static final long LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL = TimeUnit.SECONDS.toMillis(5); + private static final String DEFAULT_DOMAIN = "default"; + // Don't allow construction: static method namespace only. private LoadManagerShared() { } // Determines the brokers available for the given service unit according to the given policies. // The brokers are put into brokerCandidateCache. - public static synchronized void applyPolicies(final ServiceUnitId serviceUnit, + public static synchronized void applyNamespacePolicies(final ServiceUnitId serviceUnit, final SimpleResourceAllocationPolicies policies, final Set brokerCandidateCache, final Set availableBrokers, final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) { @@ -264,6 +281,227 @@ public static void removeMostServicingBrokersForNamespace(final String assignedB } } + /** + * It tries to filter out brokers which own namespace with same anti-affinity-group as given namespace. If all the + * domains own namespace with same anti-affinity group then it will try to keep brokers with domain that has least + * number of namespaces. It also tries to keep brokers which has least number of namespace with in domain. + * eg. + *
+     * Before:
+     * Domain-count  Brokers-count
+     * ____________  ____________
+     * d1-3          b1-2,b2-1
+     * d2-3          b3-2,b4-1
+     * d3-4          b5-2,b6-2
+     * 
+     * After filtering: "candidates" brokers
+     * Domain-count  Brokers-count
+     * ____________  ____________
+     * d1-3          b2-1
+     * d2-3          b4-1
+     * 
+     * "candidate" broker to own anti-affinity-namespace = b2 or b4
+     * 
+     * 
+ * + * @param pulsar + * @param assignedBundleName + * @param candidates + * @param brokerToNamespaceToBundleRange + */ + public static void filterAntiAffinityGroupOwnedBrokers(final PulsarService pulsar, final String assignedBundleName, + final Set candidates, final Map>> brokerToNamespaceToBundleRange, + Map brokerToDomainMap) { + if (candidates.isEmpty()) { + return; + } + final String namespaceName = getNamespaceNameFromBundleName(assignedBundleName); + try { + final Map brokerToAntiAffinityNamespaceCount = getAntiAffinityNamespaceOwnedBrokers(pulsar, + namespaceName, brokerToNamespaceToBundleRange).get(30, TimeUnit.SECONDS); + if (brokerToAntiAffinityNamespaceCount == null) { + // none of the broker owns anti-affinity-namespace so, none of the broker will be filtered + return; + } + if (pulsar.getConfiguration().isFailureDomainsEnabled()) { + // this will remove all the brokers which are part of domains that don't have least number of + // anti-affinity-namespaces + filterDomainsNotHavingLeastNumberAntiAffinityNamespaces(brokerToAntiAffinityNamespaceCount, candidates, + brokerToDomainMap); + } + // now, "candidates" has list of brokers which are part of domain that can accept this namespace. now, + // with in these domains, remove brokers which don't have least number of namespaces. so, brokers with least + // number of namespace can be selected + int leastNamaespaceCount = Integer.MAX_VALUE; + for (final String broker : candidates) { + if (brokerToAntiAffinityNamespaceCount.containsKey(broker)) { + Integer namespaceCount = brokerToAntiAffinityNamespaceCount.get(broker); + if (namespaceCount == null) { + // Assume that when the namespace is absent, there are no namespace assigned to + // that broker. + leastNamaespaceCount = 0; + break; + } + leastNamaespaceCount = Math.min(leastNamaespaceCount, namespaceCount); + } else { + // Assume non-present brokers have 0 bundles. + leastNamaespaceCount = 0; + break; + } + } + // filter out broker based on namespace distribution + if (leastNamaespaceCount == 0) { + candidates.removeIf(broker -> brokerToAntiAffinityNamespaceCount.containsKey(broker) + && brokerToAntiAffinityNamespaceCount.get(broker) > 0); + } else { + final int finalLeastNamespaceCount = leastNamaespaceCount; + candidates + .removeIf(broker -> brokerToAntiAffinityNamespaceCount.get(broker) != finalLeastNamespaceCount); + } + } catch (Exception e) { + log.error("Failed to filter anti-affinity group namespace {}", e.getMessage()); + } + } + + /** + * It computes least number of namespace owned by any of the domain and then it filters out all the domains that own + * namespaces more than this count. + * + * @param brokerToAntiAffinityNamespaceCount + * @param candidates + * @param brokerToDomainMap + */ + private static void filterDomainsNotHavingLeastNumberAntiAffinityNamespaces( + Map brokerToAntiAffinityNamespaceCount, Set candidates, + Map brokerToDomainMap) { + + if (brokerToDomainMap == null || brokerToDomainMap.isEmpty()) { + return; + } + + final Map domainNamespaceCount = Maps.newHashMap(); + int leastNamespaceCount = Integer.MAX_VALUE; + candidates.forEach(broker -> { + final String domain = brokerToDomainMap.getOrDefault(broker, DEFAULT_DOMAIN); + final int count = brokerToAntiAffinityNamespaceCount.getOrDefault(broker, 0); + domainNamespaceCount.compute(domain, (domainName, nsCount) -> nsCount == null ? count : nsCount + count); + }); + // find leastNameSpaceCount + for (Entry domainNsCountEntry : domainNamespaceCount.entrySet()) { + if (domainNsCountEntry.getValue() < leastNamespaceCount) { + leastNamespaceCount = domainNsCountEntry.getValue(); + } + } + final int finalLeastNamespaceCount = leastNamespaceCount; + // only keep domain brokers which has leastNamespaceCount + candidates.removeIf(broker -> { + Integer nsCount = domainNamespaceCount.get(brokerToDomainMap.getOrDefault(broker, DEFAULT_DOMAIN)); + return nsCount != null && nsCount != finalLeastNamespaceCount; + }); + } + + /** + * It returns map of broker and count of namespace that are belong to the same anti-affinity group as given + * {@param namespaceName} + * + * @param pulsar + * @param namespaceName + * @param brokerToNamespaceToBundleRange + * @return + */ + public static CompletableFuture> getAntiAffinityNamespaceOwnedBrokers( + final PulsarService pulsar, String namespaceName, + Map>> brokerToNamespaceToBundleRange) { + + CompletableFuture> antiAffinityNsBrokersResult = new CompletableFuture<>(); + ZooKeeperDataCache policiesCache = pulsar.getConfigurationCache().policiesCache(); + + policiesCache.getAsync(path(POLICIES, namespaceName)).thenAccept(policies -> { + if (!policies.isPresent() || StringUtils.isBlank(policies.get().antiAffinityGroup)) { + antiAffinityNsBrokersResult.complete(null); + return; + } + final String antiAffinityGroup = policies.get().antiAffinityGroup; + final Map brokerToAntiAffinityNamespaceCount = new ConcurrentHashMap<>(); + final List> futures = Lists.newArrayList(); + brokerToNamespaceToBundleRange.forEach((broker, nsToBundleRange) -> { + nsToBundleRange.forEach((ns, bundleRange) -> { + CompletableFuture future = new CompletableFuture<>(); + futures.add(future); + policiesCache.getAsync(path(POLICIES, ns)).thenAccept(nsPolicies -> { + if (nsPolicies.isPresent() && antiAffinityGroup.equalsIgnoreCase(nsPolicies.get().antiAffinityGroup)) { + brokerToAntiAffinityNamespaceCount.compute(broker, + (brokerName, count) -> count == null ? 1 : count + 1); + } + future.complete(null); + }).exceptionally(ex -> { + future.complete(null); + return null; + }); + }); + }); + FutureUtil.waitForAll(futures) + .thenAccept(r -> antiAffinityNsBrokersResult.complete(brokerToAntiAffinityNamespaceCount)); + }).exceptionally(ex -> { + // namespace-policies has not been created yet + antiAffinityNsBrokersResult.complete(null); + return null; + }); + return antiAffinityNsBrokersResult; + } + + /** + * + * It checks if given anti-affinity namespace should be unloaded by broker due to load-shedding. If all the brokers + * are owning same number of anti-affinity namespaces then unloading this namespace again ends up at the same broker + * from which it was unloaded. So, this util checks that given namespace should be unloaded only if it can be loaded + * by different broker. + * + * @param namespace + * @param bundle + * @param currentBroker + * @param pulsar + * @param brokerToNamespaceToBundleRange + * @param candidateBroekrs + * @return + * @throws Exception + */ + public static boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle, String currentBroker, + final PulsarService pulsar, Map>> brokerToNamespaceToBundleRange, + Set candidateBroekrs) throws Exception { + + Map brokerNamespaceCount = getAntiAffinityNamespaceOwnedBrokers(pulsar, namespace, + brokerToNamespaceToBundleRange).get(10, TimeUnit.SECONDS); + if (brokerNamespaceCount != null && !brokerNamespaceCount.isEmpty()) { + int leastNsCount = Integer.MAX_VALUE; + int currentBrokerNsCount = 0; + + for (String broker : candidateBroekrs) { + int nsCount = brokerNamespaceCount.getOrDefault(broker, 0); + if (currentBroker.equals(broker)) { + currentBrokerNsCount = nsCount; + } + if (leastNsCount > nsCount) { + leastNsCount = nsCount; + } + } + // check if there is any other broker has less number of ns + if (leastNsCount == 0 || currentBrokerNsCount > leastNsCount) { + return true; + } + // check if all the brokers having same number of ns-count then broker can't unload + int leastNsOwnerBrokers = 0; + for (String broker : candidateBroekrs) { + if (leastNsCount == brokerNamespaceCount.getOrDefault(broker, 0)) { + leastNsOwnerBrokers++; + } + } + // if all candidate brokers own same-number of ns then broker can't unload + return candidateBroekrs.size() != leastNsOwnerBrokers; + } + return true; + } + public interface BrokerTopicLoadingPredicate { boolean isEnablePersistentTopics(String brokerUrl); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 9d5cf30658637..b8519d85b0421 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.loadbalance.impl; import static org.apache.pulsar.broker.admin.AdminResource.jsonMapper; +import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; +import static org.apache.pulsar.broker.web.PulsarWebResource.path; import java.io.IOException; import java.util.ArrayList; @@ -27,6 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -57,6 +60,8 @@ import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; +import org.apache.pulsar.common.policies.data.FailureDomain; +import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.ResourceQuota; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; @@ -75,6 +80,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Maps; + import io.netty.util.concurrent.DefaultThreadFactory; public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCacheListener { @@ -170,7 +177,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach // check if given broker can load persistent/non-persistent topic private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate; - + + private Map brokerToFailureDomainMap; private static final Deserializer loadReportDeserializer = (key, content) -> jsonMapper() .readValue(content, LocalBrokerData.class); @@ -188,6 +196,7 @@ public ModularLoadManagerImpl() { loadSheddingPipeline.add(new OverloadShedder(conf)); preallocatedBundleToBroker = new ConcurrentHashMap<>(); scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-modular-load-manager")); + this.brokerToFailureDomainMap = Maps.newHashMap(); this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() { @Override @@ -214,6 +223,7 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) { * The service to initialize with. */ public void initialize(final PulsarService pulsar) { + this.pulsar = pulsar; availableActiveBrokers = new ZooKeeperChildrenCache(pulsar.getLocalZkCache(), LoadManager.LOADBALANCE_BROKERS_ROOT); availableActiveBrokers.registerListener(new ZooKeeperCacheListener>() { @@ -266,9 +276,15 @@ public LocalBrokerData deserialize(String key, byte[] content) throws Exception placementStrategy = ModularLoadManagerStrategy.create(conf); policies = new SimpleResourceAllocationPolicies(pulsar); - this.pulsar = pulsar; zkClient = pulsar.getZkClient(); filterPipeline.add(new BrokerVersionFilter()); + + refreshBrokerToFailureDomainMap(); + // register listeners for domain changes + pulsar.getConfigurationCache().failureDomainListCache() + .registerListener((path, data, stat) -> scheduler.execute(() -> refreshBrokerToFailureDomainMap())); + pulsar.getConfigurationCache().failureDomainCache() + .registerListener((path, data, stat) -> scheduler.execute(() -> refreshBrokerToFailureDomainMap())); } /** @@ -421,7 +437,7 @@ private boolean needBrokerDataUpdate() { } // Update both the broker data and the bundle data. - private void updateAll() { + public void updateAll() { if (log.isDebugEnabled()) { log.debug("Updating broker and bundle data for loadreport"); } @@ -575,10 +591,14 @@ public synchronized void doLoadShedding() { for (Map.Entry entry : bundlesToUnload.entrySet()) { final String broker = entry.getKey(); final String bundle = entry.getValue(); - log.info("Unloading bundle: {}", bundle); - pulsar.getAdminClient().namespaces().unloadNamespaceBundle( - LoadManagerShared.getNamespaceNameFromBundleName(bundle), - LoadManagerShared.getBundleRangeFromBundleName(bundle)); + + final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle); + final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle); + if(!shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) { + continue; + } + log.info("Unloading bundle: {} from broker {}", bundle, broker); + pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, bundleRange); loadData.getRecentlyUnloadedBundles().put(bundle, System.currentTimeMillis()); } } catch (Exception e) { @@ -589,6 +609,32 @@ public synchronized void doLoadShedding() { } } + public boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle, String currentBroker) { + try { + Optional nsPolicies = pulsar.getConfigurationCache().policiesCache() + .get(path(POLICIES, namespace)); + if (!nsPolicies.isPresent() || StringUtils.isBlank(nsPolicies.get().antiAffinityGroup)) { + return true; + } + + synchronized (brokerCandidateCache) { + brokerCandidateCache.clear(); + ServiceUnitId serviceUnit = pulsar.getNamespaceService().getNamespaceBundleFactory() + .getBundle(namespace, bundle); + LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, + getAvailableBrokers(), brokerTopicLoadingPredicate); + return LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle, currentBroker, pulsar, + brokerToNamespaceToBundleRange, brokerCandidateCache); + } + + } catch (Exception e) { + log.warn("Failed to check anti-affinity namespace ownership for {}/{}/{}, {}", namespace, bundle, + currentBroker, e.getMessage()); + + } + return true; + } + /** * As the leader broker, attempt to automatically detect and split hot namespace bundles. */ @@ -657,11 +703,18 @@ public String selectBrokerForAssignment(final ServiceUnitId serviceUnit) { final BundleData data = loadData.getBundleData().computeIfAbsent(bundle, key -> getBundleDataOrDefault(bundle)); brokerCandidateCache.clear(); - LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(), + LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(), brokerTopicLoadingPredicate); + // filter brokers which owns topic higher than threshold LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData, conf.getLoadBalancerBrokerMaxTopics()); + + // distribute namespaces to domain and brokers according to anti-affinity-group + LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, serviceUnit.toString(), brokerCandidateCache, + brokerToNamespaceToBundleRange, brokerToFailureDomainMap); + // distribute bundles evenly to candidate-brokers + LoadManagerShared.removeMostServicingBrokersForNamespace(serviceUnit.toString(), brokerCandidateCache, brokerToNamespaceToBundleRange); log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle); @@ -673,13 +726,13 @@ public String selectBrokerForAssignment(final ServiceUnitId serviceUnit) { } } catch ( BrokerFilterException x ) { // restore the list of brokers to the full set - LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(), + LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(), brokerTopicLoadingPredicate); } if ( brokerCandidateCache.isEmpty() ) { // restore the list of brokers to the full set - LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(), + LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(), brokerTopicLoadingPredicate); } @@ -693,7 +746,7 @@ public String selectBrokerForAssignment(final ServiceUnitId serviceUnit) { final double maxUsage = loadData.getBrokerData().get(broker).getLocalData().getMaxResourceUsage(); if (maxUsage > overloadThreshold) { // All brokers that were in the filtered list were overloaded, so check if there is a better broker - LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(), + LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(), brokerTopicLoadingPredicate); broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf); } @@ -871,4 +924,33 @@ private long getBrokerZnodeOwner() { } return 0; } + + private void refreshBrokerToFailureDomainMap() { + if (!pulsar.getConfiguration().isFailureDomainsEnabled()) { + return; + } + final String clusterDomainRootPath = pulsar.getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT; + try { + synchronized (brokerToFailureDomainMap) { + Map tempBrokerToFailureDomainMap = Maps.newHashMap(); + for (String domainName : pulsar.getConfigurationCache().failureDomainListCache().get()) { + try { + Optional domain = pulsar.getConfigurationCache().failureDomainCache() + .get(clusterDomainRootPath + "/" + domainName); + if (domain.isPresent()) { + for (String broker : domain.get().brokers) { + tempBrokerToFailureDomainMap.put(broker, domainName); + } + } + } catch (Exception e) { + log.warn("Failed to get domain {}", domainName, e); + } + } + this.brokerToFailureDomainMap = tempBrokerToFailureDomainMap; + } + log.info("Cluster domain refreshed {}", brokerToFailureDomainMap); + } catch (Exception e) { + log.warn("Failed to get domain-list for cluster {}", e.getMessage()); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index 338f302ddad3d..bdf0345283f4b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -913,7 +913,7 @@ private Multimap getFinalCandidates(ServiceUnitId serviceUni } brokerCandidateCache.clear(); try { - LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, availableBrokersCache, + LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, availableBrokersCache, brokerTopicLoadingPredicate); } catch (Exception e) { log.warn("Error when trying to apply policies: {}", e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index c22758dd1f887..6700649616212 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -168,7 +168,7 @@ protected void validateAdminAccessOnProperty(String property) { } catch (RestException e) { throw e; } catch (Exception e) { - log.error("Failed to get property admin data for property"); + log.error("Failed to get property admin data for property {}", property); throw new RestException(e); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java index e0d55a6612ebc..73071ff17e8e6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java @@ -115,7 +115,7 @@ private void createProperty(PulsarAdmin pulsarAdmin) throws PulsarClientException, MalformedURLException, PulsarAdminException { ClusterData clusterData = new ClusterData(); clusterData.setServiceUrl(pulsarAdmin.getServiceUrl().toString()); - pulsarAdmins[0].clusters().createCluster("my-cluster", clusterData); + pulsarAdmins[0].clusters().updateCluster("my-cluster", clusterData); Set allowedClusters = new HashSet<>(); allowedClusters.add("my-cluster"); PropertyAdmin adminConfig = new PropertyAdmin(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 795d8071abc37..444d224a34063 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -172,21 +172,23 @@ public Object[][] topicTypeProvider() { public void clusters() throws Exception { admin.clusters().createCluster("usw", new ClusterData("http://broker.messaging.use.example.com" + ":" + BROKER_WEBSERVICE_PORT)); - assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use", "usw")); + // "test" cluster is part of config-default cluster and it's znode gets created when PulsarService creates + // failure-domain znode of this default cluster + assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use", "usw")); assertEquals(admin.clusters().getCluster("use"), new ClusterData("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT)); admin.clusters().updateCluster("usw", new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT)); - assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use", "usw")); + assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use", "usw")); assertEquals(admin.clusters().getCluster("usw"), new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT)); admin.clusters().updateCluster("usw", new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT, "https://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT_TLS)); - assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use", "usw")); + assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use", "usw")); assertEquals(admin.clusters().getCluster("usw"), new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT, "https://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT_TLS)); @@ -194,11 +196,11 @@ public void clusters() throws Exception { admin.clusters().deleteCluster("usw"); Thread.sleep(300); - assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use")); + assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use")); admin.namespaces().deleteNamespace("prop-xyz/use/ns1"); admin.clusters().deleteCluster("use"); - assertEquals(admin.clusters().getClusters(), Lists.newArrayList()); + assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test")); // Check name validation try { @@ -382,7 +384,9 @@ public void brokers() throws Exception { admin.namespaces().deleteNamespace("prop-xyz/use/ns1"); admin.clusters().deleteCluster("use"); - assertEquals(admin.clusters().getClusters(), Lists.newArrayList()); + // "test" cluster is part of config-default cluster and it's znode gets created when PulsarService creates + // failure-domain znode of this default cluster + assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test")); } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index 2362f44f4a424..5825783564d9a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin; +import static org.apache.commons.lang3.StringUtils.isBlank; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; @@ -25,7 +26,9 @@ import static org.testng.Assert.fail; import java.net.URL; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -54,6 +57,7 @@ import org.apache.pulsar.common.naming.DestinationDomain; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.NonPersistentTopicStats; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.PersistencePolicies; @@ -582,7 +586,7 @@ public void testPeerCluster() throws Exception { assertTrue(e instanceof PreconditionFailedException); } } - + /** * It validates that peer-cluster can't coexist in replication-cluster list * @@ -640,4 +644,69 @@ public void testReplicationPeerCluster() throws Exception { admin.namespaces().setNamespaceReplicationClusters(namespace, clusterIds); } + @Test + public void clusterFailureDomain() throws PulsarAdminException { + + final String cluster = pulsar.getConfiguration().getClusterName(); + admin.clusters().updateCluster(cluster, + new ClusterData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls())); + // create + FailureDomain domain = new FailureDomain(); + domain.setBrokers(Sets.newHashSet("b1", "b2", "b3")); + admin.clusters().createFailureDomain(cluster, "domain-1", domain); + admin.clusters().updateFailureDomain(cluster, "domain-1", domain); + + assertEquals(admin.clusters().getFailureDomain(cluster, "domain-1"), domain); + + Map domains = admin.clusters().getFailureDomains(cluster); + assertEquals(domains.size(), 1); + assertTrue(domains.containsKey("domain-1")); + + try { + // try to create domain with already registered brokers + admin.clusters().createFailureDomain(cluster, "domain-2", domain); + fail("should have failed because of brokers are already registered"); + } catch (PulsarAdminException.ConflictException e) { + // Ok + } + + admin.clusters().deleteFailureDomain(cluster, "domain-1"); + assertTrue(admin.clusters().getFailureDomains(cluster).isEmpty()); + + admin.clusters().createFailureDomain(cluster, "domain-2", domain); + domains = admin.clusters().getFailureDomains(cluster); + assertEquals(domains.size(), 1); + assertTrue(domains.containsKey("domain-2")); + } + + @Test + public void namespaceAntiAffinity() throws PulsarAdminException { + final String namespace = "prop-xyz/use/ns1"; + final String antiAffinityGroup = "group"; + assertTrue(isBlank(admin.namespaces().getNamespaceAntiAffinityGroup(namespace))); + admin.namespaces().setNamespaceAntiAffinityGroup(namespace, antiAffinityGroup); + assertEquals(admin.namespaces().getNamespaceAntiAffinityGroup(namespace), antiAffinityGroup); + admin.namespaces().deleteNamespaceAntiAffinityGroup(namespace); + assertTrue(isBlank(admin.namespaces().getNamespaceAntiAffinityGroup(namespace))); + + final String ns1 = "prop-xyz/use/antiAG1"; + final String ns2 = "prop-xyz/use/antiAG2"; + final String ns3 = "prop-xyz/use/antiAG3"; + admin.namespaces().createNamespace(ns1); + admin.namespaces().createNamespace(ns2); + admin.namespaces().createNamespace(ns3); + admin.namespaces().setNamespaceAntiAffinityGroup(ns1, antiAffinityGroup); + admin.namespaces().setNamespaceAntiAffinityGroup(ns2, antiAffinityGroup); + admin.namespaces().setNamespaceAntiAffinityGroup(ns3, antiAffinityGroup); + + Set namespaces = new HashSet<>( + admin.namespaces().getAntiAffinityNamespaces("dummy", "use", antiAffinityGroup)); + assertEquals(namespaces.size(), 3); + assertTrue(namespaces.contains(ns1)); + assertTrue(namespaces.contains(ns2)); + assertTrue(namespaces.contains(ns3)); + + List namespaces2 = admin.namespaces().getAntiAffinityNamespaces("dummy", "use", "invalid-group"); + assertEquals(namespaces2.size(), 0); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index 0245b67210ed4..c587033af5d18 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -91,10 +91,11 @@ public class AdminTest extends MockedPulsarServiceBaseTest { private Field uriField; private UriInfo uriInfo; + private final String configClusterName = "use"; public AdminTest() { super(); - conf.setClusterName("use"); + conf.setClusterName(configClusterName); } @Override @@ -186,10 +187,10 @@ public void cleanup() throws Exception { @Test void clusters() throws Exception { - assertEquals(clusters.getClusters(), new ArrayList()); + assertEquals(clusters.getClusters(), Lists.newArrayList(configClusterName)); verify(clusters, never()).validateSuperUserAccess(); - clusters.createCluster("use", new ClusterData("http://broker.messaging.use.example.com")); + clusters.updateCluster("use", new ClusterData("http://broker.messaging.use.example.com")); verify(clusters, times(1)).validateSuperUserAccess(); // ensure to read from ZooKeeper directly clusters.clustersListCache().clear(); @@ -447,7 +448,7 @@ void properties() throws Exception { assertEquals(properties.getProperties(), Lists.newArrayList()); // Create a namespace to test deleting a non-empty property - clusters.createCluster("use", new ClusterData()); + clusters.updateCluster("use", new ClusterData()); newPropertyAdmin = new PropertyAdmin(Lists.newArrayList("role1", "other-role"), Sets.newHashSet("use")); properties.createProperty("my-property", newPropertyAdmin); @@ -474,7 +475,7 @@ void properties() throws Exception { @Test void brokers() throws Exception { - clusters.createCluster("use", new ClusterData("http://broker.messaging.use.example.com", + clusters.updateCluster("use", new ClusterData("http://broker.messaging.use.example.com", "https://broker.messaging.use.example.com:4443")); URI requestUri = new URI( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 37ec7ebc8f170..36c9ec2b75f9a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -135,7 +135,7 @@ public void setup() throws Exception { doNothing().when(namespaces).validateAdminAccessOnProperty("other-property"); doNothing().when(namespaces).validateAdminAccessOnProperty("new-property"); - admin.clusters().createCluster("use", new ClusterData("http://broker-use.com:" + BROKER_WEBSERVICE_PORT)); + admin.clusters().updateCluster("use", new ClusterData("http://broker-use.com:" + BROKER_WEBSERVICE_PORT)); admin.clusters().createCluster("usw", new ClusterData("http://broker-usw.com:" + BROKER_WEBSERVICE_PORT)); admin.clusters().createCluster("usc", new ClusterData("http://broker-usc.com:" + BROKER_WEBSERVICE_PORT)); admin.properties().createProperty(this.testProperty, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java index 0f79e7509b47e..cee4bbe3310f2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java @@ -64,7 +64,7 @@ void simple() throws Exception { assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), false); - admin.clusters().createCluster("c1", new ClusterData()); + admin.clusters().updateCluster("c1", new ClusterData()); admin.properties().createProperty("p1", new PropertyAdmin(Lists.newArrayList("role1"), Sets.newHashSet("c1"))); waitForChange(); admin.namespaces().createNamespace("p1/c1/ns1"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 67873bc6c5f39..8228d21da4a9d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -77,6 +77,7 @@ public abstract class MockedPulsarServiceBaseTest { protected MockZooKeeper mockZookKeeper; protected NonClosableMockBookKeeper mockBookKeeper; protected boolean isTcpLookup = false; + protected final String configClusterName = "test"; private SameThreadOrderedSafeExecutor sameThreadOrderedSafeExecutor; @@ -90,7 +91,7 @@ protected void resetConfig() { this.conf.setBrokerServicePortTls(BROKER_PORT_TLS); this.conf.setWebServicePort(BROKER_WEBSERVICE_PORT); this.conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS); - this.conf.setClusterName("test"); + this.conf.setClusterName(configClusterName); this.conf.setAdvertisedAddress("localhost"); // there are TLS tests in here, they need to use localhost because of the certificate this.conf.setManagedLedgerCacheSizeMB(8); this.conf.setActiveConsumerFailoverDelayTimeMillis(0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java new file mode 100644 index 0000000000000..ec0bf1f54e048 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java @@ -0,0 +1,540 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; + +import java.lang.reflect.Field; +import java.net.InetAddress; +import java.net.URL; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.test.PortManager; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; +import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; +import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceBundles; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.ServiceUnitId; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.FailureDomain; +import org.apache.pulsar.common.policies.data.PropertyAdmin; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.beust.jcommander.internal.Maps; +import com.google.common.collect.BoundType; +import com.google.common.collect.Lists; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; +import com.google.common.hash.Hashing; + +import junit.framework.Assert; + +public class AntiAffinityNamespaceGroupTest { + private LocalBookkeeperEnsemble bkEnsemble; + + private URL url1; + private PulsarService pulsar1; + private PulsarAdmin admin1; + + private URL url2; + private PulsarService pulsar2; + private PulsarAdmin admin2; + + private String primaryHost; + private String secondaryHost; + + private NamespaceBundleFactory nsFactory; + + private ModularLoadManagerImpl primaryLoadManager; + private ModularLoadManagerImpl secondaryLoadManager; + + private ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, + new LinkedBlockingQueue()); + + private final int ZOOKEEPER_PORT = PortManager.nextFreePort(); + private final int PRIMARY_BROKER_WEBSERVICE_PORT = PortManager.nextFreePort(); + private final int SECONDARY_BROKER_WEBSERVICE_PORT = PortManager.nextFreePort(); + private final int PRIMARY_BROKER_PORT = PortManager.nextFreePort(); + private final int SECONDARY_BROKER_PORT = PortManager.nextFreePort(); + private static final Logger log = LoggerFactory.getLogger(AntiAffinityNamespaceGroupTest.class); + + static { + System.setProperty("test.basePort", "16100"); + } + + private static Object getField(final Object instance, final String fieldName) throws Exception { + final Field field = instance.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + return field.get(instance); + } + + @BeforeMethod + void setup() throws Exception { + + // Start local bookkeeper ensemble + bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort()); + bkEnsemble.start(); + + // Start broker 1 + ServiceConfiguration config1 = new ServiceConfiguration(); + config1.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); + config1.setClusterName("use"); + config1.setWebServicePort(PRIMARY_BROKER_WEBSERVICE_PORT); + config1.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); + config1.setBrokerServicePort(PRIMARY_BROKER_PORT); + config1.setFailureDomainsEnabled(true); + config1.setLoadBalancerEnabled(true); + createCluster(bkEnsemble.getZkClient(), config1); + pulsar1 = new PulsarService(config1); + + pulsar1.start(); + + primaryHost = String.format("%s:%d", InetAddress.getLocalHost().getHostName(), PRIMARY_BROKER_WEBSERVICE_PORT); + url1 = new URL("http://127.0.0.1" + ":" + PRIMARY_BROKER_WEBSERVICE_PORT); + admin1 = new PulsarAdmin(url1, (Authentication) null); + + // Start broker 2 + ServiceConfiguration config2 = new ServiceConfiguration(); + config2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); + config2.setClusterName("use"); + config2.setWebServicePort(SECONDARY_BROKER_WEBSERVICE_PORT); + config2.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); + config2.setBrokerServicePort(SECONDARY_BROKER_PORT); + config2.setFailureDomainsEnabled(true); + pulsar2 = new PulsarService(config2); + secondaryHost = String.format("%s:%d", InetAddress.getLocalHost().getHostName(), + SECONDARY_BROKER_WEBSERVICE_PORT); + + pulsar2.start(); + + url2 = new URL("http://127.0.0.1" + ":" + SECONDARY_BROKER_WEBSERVICE_PORT); + admin2 = new PulsarAdmin(url2, (Authentication) null); + + primaryLoadManager = (ModularLoadManagerImpl) getField(pulsar1.getLoadManager().get(), "loadManager"); + secondaryLoadManager = (ModularLoadManagerImpl) getField(pulsar2.getLoadManager().get(), "loadManager"); + nsFactory = new NamespaceBundleFactory(pulsar1, Hashing.crc32()); + Thread.sleep(100); + } + + @AfterMethod + void shutdown() throws Exception { + log.info("--- Shutting down ---"); + executor.shutdown(); + + admin1.close(); + admin2.close(); + + pulsar2.close(); + pulsar1.close(); + + bkEnsemble.stop(); + } + + private void createCluster(ZooKeeper zk, ServiceConfiguration config) throws Exception { + ZkUtils.createFullPathOptimistic(zk, "/admin/clusters/" + config.getClusterName(), + ObjectMapperFactory.getThreadLocal().writeValueAsBytes( + new ClusterData("http://" + config.getAdvertisedAddress() + ":" + config.getWebServicePort())), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + @Test + public void testClusterDomain() { + + } + + /** + * + * It verifies anti-affinity-namespace assignment with failure-domain + * + *
+     * Domain     Brokers-count
+     * ________  ____________
+     * domain-0   broker-0,broker-1
+     * domain-1   broker-2,broker-3
+     * 
+     * Anti-affinity-namespace assignment
+     * 
+     * (1) ns0 -> candidate-brokers: b0, b1, b2, b3 => selected b0
+     * (2) ns1 -> candidate-brokers: b2, b3         => selected b2
+     * (3) ns2 -> candidate-brokers: b1, b3         => selected b1
+     * (4) ns3 -> candidate-brokers: b3             => selected b3
+     * (5) ns4 -> candidate-brokers: b0, b1, b2, b3 => selected b0
+     * 
+     * "candidate" broker to own anti-affinity-namespace = b2 or b4
+     * 
+     * 
+ * + */ + @Test + public void testAntiAffinityNamespaceFilteringWithDomain() throws Exception { + + final String namespace = "my-property/use/my-ns"; + final int totalNamespaces = 5; + final String namespaceAntiAffinityGroup = "my-antiaffinity"; + final String bundle = "/0x00000000_0xffffffff"; + final int totalBrokers = 4; + + pulsar1.getConfiguration().setFailureDomainsEnabled(true); + admin1.properties().createProperty("my-property", + new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use"))); + + for (int i = 0; i < totalNamespaces; i++) { + final String ns = namespace + i; + admin1.namespaces().createNamespace(ns); + admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup); + } + + Set brokers = Sets.newHashSet(); + Map brokerToDomainMap = Maps.newHashMap(); + brokers.add("brokerName-0"); + brokerToDomainMap.put("brokerName-0", "domain-0"); + brokers.add("brokerName-1"); + brokerToDomainMap.put("brokerName-1", "domain-0"); + brokers.add("brokerName-2"); + brokerToDomainMap.put("brokerName-2", "domain-1"); + brokers.add("brokerName-3"); + brokerToDomainMap.put("brokerName-3", "domain-1"); + + Set candidate = Sets.newHashSet(); + Map>> brokerToNamespaceToBundleRange = Maps.newHashMap(); + + Assert.assertEquals(brokers.size(), totalBrokers); + + String assignedNamespace = namespace + "0" + bundle; + candidate.addAll(brokers); + + // for namespace-0 all brokers available + LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, brokers, + brokerToNamespaceToBundleRange, brokerToDomainMap); + Assert.assertEquals(brokers.size(), totalBrokers); + + // add namespace-0 to broker-0 of domain-0 => state: n0->b0 + selectBrokerForNamespace(brokerToNamespaceToBundleRange, "brokerName-0", namespace + "0", assignedNamespace); + candidate.addAll(brokers); + // for namespace-1 only domain-1 brokers are available as broker-0 already owns namespace-0 + assignedNamespace = namespace + "1" + bundle; + LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate, + brokerToNamespaceToBundleRange, brokerToDomainMap); + Assert.assertEquals(candidate.size(), 2); + candidate.forEach(broker -> Assert.assertEquals(brokerToDomainMap.get(broker), "domain-1")); + + // add namespace-1 to broker-2 of domain-1 => state: n0->b0, n1->b2 + selectBrokerForNamespace(brokerToNamespaceToBundleRange, "brokerName-2", namespace + "1", assignedNamespace); + candidate.addAll(brokers); + // for namespace-2 only brokers available are : broker-1 and broker-3 + assignedNamespace = namespace + "2" + bundle; + LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate, + brokerToNamespaceToBundleRange, brokerToDomainMap); + Assert.assertEquals(candidate.size(), 2); + Assert.assertTrue(candidate.contains("brokerName-1")); + Assert.assertTrue(candidate.contains("brokerName-3")); + + // add namespace-2 to broker-1 of domain-0 => state: n0->b0, n1->b2, n2->b1 + selectBrokerForNamespace(brokerToNamespaceToBundleRange, "brokerName-1", namespace + "2", assignedNamespace); + candidate.addAll(brokers); + // for namespace-3 only brokers available are : broker-3 + assignedNamespace = namespace + "3" + bundle; + LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate, + brokerToNamespaceToBundleRange, brokerToDomainMap); + Assert.assertEquals(candidate.size(), 1); + Assert.assertTrue(candidate.contains("brokerName-3")); + // add namespace-3 to broker-3 of domain-1 => state: n0->b0, n1->b2, n2->b1, n3->b3 + selectBrokerForNamespace(brokerToNamespaceToBundleRange, "brokerName-3", namespace + "3", assignedNamespace); + candidate.addAll(brokers); + // for namespace-4 only brokers available are : all 4 brokers + assignedNamespace = namespace + "4" + bundle; + LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate, + brokerToNamespaceToBundleRange, brokerToDomainMap); + Assert.assertEquals(candidate.size(), 4); + } + + /** + * It verifies anti-affinity-namespace assignment without failure-domain enabled + * + *
+     *  Anti-affinity-namespace assignment
+     * 
+     * (1) ns0 -> candidate-brokers: b0, b1, b2     => selected b0
+     * (2) ns1 -> candidate-brokers: b1, b2         => selected b1
+     * (3) ns2 -> candidate-brokers: b2             => selected b2
+     * (5) ns3 -> candidate-brokers: b0, b1, b2     => selected b0
+     * 
+ * + * + * @throws Exception + */ + @Test + public void testAntiAffinityNamespaceFilteringWithoutDomain() throws Exception { + + final String namespace = "my-property/use/my-ns"; + final int totalNamespaces = 5; + final String namespaceAntiAffinityGroup = "my-antiaffinity"; + final String bundle = "/0x00000000_0xffffffff"; + + admin1.properties().createProperty("my-property", + new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use"))); + + for (int i = 0; i < totalNamespaces; i++) { + final String ns = namespace + i; + admin1.namespaces().createNamespace(ns); + admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup); + } + + Set brokers = Sets.newHashSet(); + Set candidate = Sets.newHashSet(); + Map>> brokerToNamespaceToBundleRange = Maps.newHashMap(); + brokers.add("broker-0"); + brokers.add("broker-1"); + brokers.add("broker-2"); + + String assignedNamespace = namespace + "0" + bundle; + + // all brokers available so, candidate will be all 3 brokers + candidate.addAll(brokers); + LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, brokers, + brokerToNamespaceToBundleRange, null); + Assert.assertEquals(brokers.size(), 3); + + // add ns-0 to broker-0 + selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-0", namespace + "0", assignedNamespace); + candidate.addAll(brokers); + assignedNamespace = namespace + "1" + bundle; + // available brokers for ns-1 => broker-1, broker-2 + LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate, + brokerToNamespaceToBundleRange, null); + Assert.assertEquals(candidate.size(), 2); + Assert.assertTrue(candidate.contains("broker-1")); + Assert.assertTrue(candidate.contains("broker-2")); + + // add ns-1 to broker-1 + selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-1", namespace + "1", assignedNamespace); + candidate.addAll(brokers); + // available brokers for ns-2 => broker-2 + assignedNamespace = namespace + "2" + bundle; + LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate, + brokerToNamespaceToBundleRange, null); + Assert.assertEquals(candidate.size(), 1); + Assert.assertTrue(candidate.contains("broker-2")); + + // add ns-2 to broker-2 + selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-2", namespace + "2", assignedNamespace); + candidate.addAll(brokers); + // available brokers for ns-3 => broker-0, broker-1, broker-2 + assignedNamespace = namespace + "3" + bundle; + LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate, + brokerToNamespaceToBundleRange, null); + Assert.assertEquals(candidate.size(), 3); + } + + private void selectBrokerForNamespace(Map>> brokerToNamespaceToBundleRange, + String broker, String namespace, String assignedBundleName) { + Map> nsToBundleMap = Maps.newHashMap(); + nsToBundleMap.put(namespace, Sets.newHashSet(assignedBundleName)); + brokerToNamespaceToBundleRange.put(broker, nsToBundleMap); + } + + /** + * It verifies anti-affinity with failure domain enabled with 2 brokers. + * + *
+     * 1. Register brokers to domain: domain-1: broker1 & domain-2: broker2
+     * 2. Load-Manager receives a watch and updates brokerToDomain cache with new domain data
+     * 3. Create two namespace with anti-affinity
+     * 4. Load-manager selects broker for each namespace such that from different domains
+     * 
+     * 
+ * + * @throws Exception + */ + @Test + public void testBrokerSelectionForAntiAffinityGroup() throws Exception { + + final String broker1 = primaryHost; + final String broker2 = secondaryHost; + final String cluster = pulsar1.getConfiguration().getClusterName(); + final String property = "prop"; + final String namespace1 = property + "/" + cluster + "/ns1"; + final String namespace2 = property + "/" + cluster + "/ns2"; + final String namespaceAntiAffinityGroup = "group"; + FailureDomain domain = new FailureDomain(); + domain.brokers = Sets.newHashSet(broker1); + admin1.clusters().createFailureDomain(cluster, "domain1", domain); + domain.brokers = Sets.newHashSet(broker2); + admin1.clusters().createFailureDomain(cluster, "domain1", domain); + admin1.properties().createProperty(property, new PropertyAdmin(null, Sets.newHashSet(cluster))); + admin1.namespaces().createNamespace(namespace1); + admin1.namespaces().createNamespace(namespace2); + admin1.namespaces().setNamespaceAntiAffinityGroup(namespace1, namespaceAntiAffinityGroup); + admin1.namespaces().setNamespaceAntiAffinityGroup(namespace2, namespaceAntiAffinityGroup); + + // validate strategically if brokerToDomainCache updated + for (int i = 0; i < 5; i++) { + if (!isLoadManagerUpdatedDomainCache(primaryLoadManager) + || !isLoadManagerUpdatedDomainCache(secondaryLoadManager) || i != 4) { + Thread.sleep(200); + } + } + assertTrue(isLoadManagerUpdatedDomainCache(primaryLoadManager)); + assertTrue(isLoadManagerUpdatedDomainCache(secondaryLoadManager)); + + ServiceUnitId serviceUnit = makeBundle(property, cluster, "ns1"); + String selectedBroker1 = primaryLoadManager.selectBrokerForAssignment(serviceUnit); + + serviceUnit = makeBundle(property, cluster, "ns2"); + String selectedBroker2 = primaryLoadManager.selectBrokerForAssignment(serviceUnit); + + assertNotEquals(selectedBroker1, selectedBroker2); + + } + + /** + * It verifies that load-shedding task should unload namespace only if there is a broker available which doesn't + * cause uneven anti-affinitiy namespace distribution. + * + *
+     * 1. broker1 owns ns-0 => broker1 can unload ns-0
+     * 1. broker2 owns ns-1 => broker1 can unload ns-0
+     * 1. broker3 owns ns-2 => broker1 can't unload ns-0 as all brokers have same no NS
+     * 
+ * + * @throws Exception + */ + @Test + public void testLoadSheddingUtilWithAntiAffinityNamespace() throws Exception { + + final String namespace = "my-property/use/my-ns"; + final int totalNamespaces = 5; + final String namespaceAntiAffinityGroup = "my-antiaffinity"; + final String bundle = "/0x00000000_0xffffffff"; + + admin1.properties().createProperty("my-property", + new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use"))); + + for (int i = 0; i < totalNamespaces; i++) { + final String ns = namespace + i; + admin1.namespaces().createNamespace(ns); + admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup); + } + + Set brokers = Sets.newHashSet(); + Set candidate = Sets.newHashSet(); + Map>> brokerToNamespaceToBundleRange = Maps.newHashMap(); + brokers.add("broker-0"); + brokers.add("broker-1"); + brokers.add("broker-2"); + + String assignedNamespace = namespace + "0" + bundle; + + // all brokers available so, candidate will be all 3 brokers + candidate.addAll(brokers); + // add ns-0 to broker-0 + selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-0", namespace + "0", assignedNamespace); + String currentBroker = "broker-0"; + boolean shouldUnload = LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace + "0", bundle, + currentBroker, pulsar1, brokerToNamespaceToBundleRange, candidate); + assertTrue(shouldUnload); + // add ns-1 to broker-1 + selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-1", namespace + "1", assignedNamespace); + shouldUnload = LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace + "0", bundle, currentBroker, + pulsar1, brokerToNamespaceToBundleRange, candidate); + assertTrue(shouldUnload); + // add ns-2 to broker-2 + selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-2", namespace + "2", assignedNamespace); + shouldUnload = LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace + "0", bundle, currentBroker, + pulsar1, brokerToNamespaceToBundleRange, candidate); + assertFalse(shouldUnload); + + } + + /** + * It verifies that load-manager::shouldAntiAffinityNamespaceUnload checks that unloading should only happen if all + * brokers have same number of anti-affinity namespaces + * + * @throws Exception + */ + @Test + public void testLoadSheddingWithAntiAffinityNamespace() throws Exception { + + final String namespace = "my-property/use/my-ns"; + final int totalNamespaces = 5; + final String namespaceAntiAffinityGroup = "my-antiaffinity"; + final String bundle = "0x00000000_0xffffffff"; + + admin1.properties().createProperty("my-property", + new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use"))); + + for (int i = 0; i < totalNamespaces; i++) { + final String ns = namespace + i; + admin1.namespaces().createNamespace(ns); + admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup); + } + + PulsarClient pulsarClient = PulsarClient.create(pulsar1.getWebServiceAddress()); + Producer producer = pulsarClient.createProducer("persistent://" + namespace + "0/my-topic1"); + ModularLoadManagerImpl loadManager = (ModularLoadManagerImpl) ((ModularLoadManagerWrapper) pulsar1 + .getLoadManager().get()).getLoadManager(); + + pulsar1.getBrokerService().updateRates(); + loadManager.updateAll(); + + assertTrue(loadManager.shouldAntiAffinityNamespaceUnload(namespace + "0", bundle, primaryHost)); + producer.close(); + pulsarClient.close(); + } + + private boolean isLoadManagerUpdatedDomainCache(ModularLoadManagerImpl loadManager) throws Exception { + Field mapField = ModularLoadManagerImpl.class.getDeclaredField("brokerToFailureDomainMap"); + mapField.setAccessible(true); + Map map = (Map) mapField.get(loadManager); + return !map.isEmpty(); + } + + private NamespaceBundle makeBundle(final String property, final String cluster, final String namespace) { + return nsFactory.getBundle(NamespaceName.get(property, cluster, namespace), + Range.range(NamespaceBundles.FULL_LOWER_BOUND, BoundType.CLOSED, NamespaceBundles.FULL_UPPER_BOUND, + BoundType.CLOSED)); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java index cc4c0823cec42..fce825c02ac78 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java @@ -499,7 +499,7 @@ public void testNamespaceIsolationPoliciesForPrimaryAndSecondaryBrokers() throws final String broker1Address = pulsar1.getAdvertisedAddress() + "0"; final String broker2Address = pulsar2.getAdvertisedAddress() + "1"; final String sharedBroker = "broker3"; - admin1.clusters().createCluster(cluster, new ClusterData("http://" + pulsar1.getAdvertisedAddress())); + admin1.clusters().updateCluster(cluster, new ClusterData("http://" + pulsar1.getAdvertisedAddress())); admin1.properties().createProperty(property, new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet(cluster))); admin1.namespaces().createNamespace(property + "/" + cluster + "/" + namespace); @@ -535,7 +535,7 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) { // test1: shared=1, primary=1, secondary=1 => It should return 1 primary broker only Set brokerCandidateCache = Sets.newHashSet(); Set availableBrokers = Sets.newHashSet(sharedBroker, broker1Address, broker2Address); - LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache, + LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache, availableBrokers, brokerTopicLoadingPredicate); assertEquals(brokerCandidateCache.size(), 1); assertTrue(brokerCandidateCache.contains(broker1Address)); @@ -543,7 +543,7 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) { // test2: shared=1, primary=0, secondary=1 => It should return 1 secondary broker only brokerCandidateCache = Sets.newHashSet(); availableBrokers = Sets.newHashSet(sharedBroker, broker2Address); - LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache, + LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache, availableBrokers, brokerTopicLoadingPredicate); assertEquals(brokerCandidateCache.size(), 1); assertTrue(brokerCandidateCache.contains(broker2Address)); @@ -551,7 +551,7 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) { // test3: shared=1, primary=0, secondary=0 => It should return 0 broker brokerCandidateCache = Sets.newHashSet(); availableBrokers = Sets.newHashSet(sharedBroker); - LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache, + LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache, availableBrokers, brokerTopicLoadingPredicate); assertEquals(brokerCandidateCache.size(), 0); @@ -565,7 +565,7 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) { // test1: shared=1, primary=1, secondary=1 => It should return primary + secondary brokerCandidateCache = Sets.newHashSet(); availableBrokers = Sets.newHashSet(sharedBroker, broker1Address, broker2Address); - LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache, + LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache, availableBrokers, brokerTopicLoadingPredicate); assertEquals(brokerCandidateCache.size(), 2); assertTrue(brokerCandidateCache.contains(broker1Address)); @@ -574,7 +574,7 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) { // test2: shared=1, secondary=1 => It should return secondary brokerCandidateCache = Sets.newHashSet(); availableBrokers = Sets.newHashSet(sharedBroker, broker2Address); - LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache, + LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache, availableBrokers, brokerTopicLoadingPredicate); assertEquals(brokerCandidateCache.size(), 1); assertTrue(brokerCandidateCache.contains(broker2Address)); @@ -582,7 +582,7 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) { // test3: shared=1, => It should return 0 broker brokerCandidateCache = Sets.newHashSet(); availableBrokers = Sets.newHashSet(sharedBroker); - LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache, + LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache, availableBrokers, brokerTopicLoadingPredicate); assertEquals(brokerCandidateCache.size(), 0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 720b48eb381aa..286a700b5d71d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -95,7 +95,7 @@ void setup() throws Exception { adminUrl = new URL("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT); admin = new PulsarAdmin(adminUrl, (Authentication) null); - admin.clusters().createCluster("usc", new ClusterData(adminUrl.toString())); + admin.clusters().updateCluster("usc", new ClusterData(adminUrl.toString())); admin.properties().createProperty("prop", new PropertyAdmin(Lists.newArrayList("appid1"), Sets.newHashSet("usc"))); admin.namespaces().createNamespace("prop/usc/ns-quota"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index 44372e7309dcc..e487f125217ee 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -203,11 +203,11 @@ void setup() throws Exception { admin3 = new PulsarAdmin(url3, (Authentication) null); // Provision the global namespace - admin1.clusters().createCluster("r1", new ClusterData(url1.toString(), urlTls1.toString(), + admin1.clusters().updateCluster("r1", new ClusterData(url1.toString(), urlTls1.toString(), pulsar1.getBrokerServiceUrl(), pulsar1.getBrokerServiceUrlTls())); - admin1.clusters().createCluster("r2", new ClusterData(url2.toString(), urlTls2.toString(), + admin1.clusters().updateCluster("r2", new ClusterData(url2.toString(), urlTls2.toString(), pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls())); - admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), urlTls3.toString(), + admin1.clusters().updateCluster("r3", new ClusterData(url3.toString(), urlTls3.toString(), pulsar3.getBrokerServiceUrl(), pulsar3.getBrokerServiceUrlTls())); admin1.clusters().createCluster("global", new ClusterData("http://global:8080", "https://global:8443")); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java index d9a7c0fca42cf..0ac89e4672e28 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.broker.zookeeper; -import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -29,7 +29,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import com.google.common.collect.Lists; +import com.google.common.collect.Sets; public class ZooKeeperSessionExpireRecoveryTest extends MockedPulsarServiceBaseTest { @@ -52,11 +52,11 @@ protected void cleanup() throws Exception { public void testSessionExpired() throws Exception { admin.clusters().createCluster("my-cluster", new ClusterData("test-url")); - assertEquals(admin.clusters().getClusters(), Lists.newArrayList("my-cluster")); + assertTrue(Sets.newHashSet(admin.clusters().getClusters()).contains("my-cluster")); mockZookKeeper.failNow(Code.SESSIONEXPIRED); - assertEquals(admin.clusters().getClusters(), Lists.newArrayList("my-cluster")); + assertTrue(Sets.newHashSet(admin.clusters().getClusters()).contains("my-cluster")); try { admin.clusters().createCluster("my-cluster-2", new ClusterData("test-url")); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java index 7c7cff4e938dd..7324074d6bb86 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java @@ -63,6 +63,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem"; private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem"; private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem"; + private final String configClustername = "use"; @BeforeMethod @Override @@ -93,8 +94,6 @@ protected void setup() throws Exception { providers.add(AuthenticationProviderTls.class.getName()); conf.setAuthenticationProviders(providers); - conf.setClusterName("use"); - super.init(); } @@ -156,7 +155,7 @@ public void testSyncProducerAndConsumer(int batchMessageDelayMs) throws Exceptio consumer.close(); } - @Test(dataProvider = "batch") + @Test(timeOut = 5000, dataProvider = "batch") public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception { log.info("-- Starting {} test --", methodName); @@ -178,7 +177,7 @@ public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Excep log.info("-- Exiting {} test --", methodName); } - @Test(dataProvider = "batch") + @Test(timeOut = 5000, dataProvider = "batch") public void testAnonymousSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception { log.info("-- Starting {} test --", methodName); @@ -226,7 +225,7 @@ public void testAnonymousSyncProducerAndConsumer(int batchMessageDelayMs) throws * * @throws Exception */ - @Test + @Test(timeOut = 5000) public void testAuthenticationFilterNegative() throws Exception { log.info("-- Starting {} test --", methodName); @@ -257,7 +256,7 @@ public void testAuthenticationFilterNegative() throws Exception { * * @throws Exception */ - @Test + @Test(timeOut = 5000) public void testInternalServerExceptionOnLookup() throws Exception { log.info("-- Starting {} test --", methodName); @@ -268,7 +267,7 @@ public void testInternalServerExceptionOnLookup() throws Exception { authTls.configure(authParams); internalSetup(authTls); - admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), + admin.clusters().updateCluster(configClustername, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS)); admin.properties().createProperty("my-property", new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use"))); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index 4a8d8cf4c42cf..0d1b8c3d8e23e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -73,6 +73,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(NonPersistentTopicTest.class); + private final String configClusterName = "r1"; @DataProvider(name = "subscriptionType") public Object[][] getSubscriptionType() { @@ -835,7 +836,7 @@ void setupReplicationCluster() throws Exception { // completely // independent config objects instead of referring to the same properties object ServiceConfiguration config1 = new ServiceConfiguration(); - config1.setClusterName("r1"); + config1.setClusterName(configClusterName); config1.setWebServicePort(webServicePort1); config1.setZookeeperServers("127.0.0.1:" + zkPort1); config1.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo"); @@ -901,11 +902,11 @@ void setupReplicationCluster() throws Exception { admin3 = new PulsarAdmin(url3, (Authentication) null); // Provision the global namespace - admin1.clusters().createCluster("r1", new ClusterData(url1.toString(), null, pulsar1.getBrokerServiceUrl(), + admin1.clusters().updateCluster("r1", new ClusterData(url1.toString(), null, pulsar1.getBrokerServiceUrl(), pulsar1.getBrokerServiceUrlTls())); - admin1.clusters().createCluster("r2", new ClusterData(url2.toString(), null, pulsar2.getBrokerServiceUrl(), + admin1.clusters().updateCluster("r2", new ClusterData(url2.toString(), null, pulsar2.getBrokerServiceUrl(), pulsar1.getBrokerServiceUrlTls())); - admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), null, pulsar3.getBrokerServiceUrl(), + admin1.clusters().updateCluster("r3", new ClusterData(url3.toString(), null, pulsar3.getBrokerServiceUrl(), pulsar1.getBrokerServiceUrlTls())); admin1.clusters().createCluster("global", new ClusterData("http://global:8080")); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java index b8c1cde6ae320..7b8e083f20f68 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java @@ -40,6 +40,7 @@ public class TlsProducerConsumerBase extends ProducerConsumerBase { protected final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem"; protected final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem"; protected final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem"; + private final String clusterName = "use"; @BeforeMethod @Override @@ -62,7 +63,7 @@ protected void internalSetUpForBroker() throws Exception { conf.setTlsEnabled(true); conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); - conf.setClusterName("use"); + conf.setClusterName(clusterName); } protected void internalSetUpForClient() throws Exception { @@ -78,7 +79,7 @@ protected void internalSetUpForNamespace() throws Exception { clientConf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); clientConf.setUseTls(true); admin = spy(new PulsarAdmin(brokerUrlTls, clientConf)); - admin.clusters().createCluster("use", + admin.clusters().updateCluster(clusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS)); admin.properties().createProperty("my-property", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java index c996c9f7130b0..302f2e462cf65 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java @@ -44,7 +44,8 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest { private WebSocketService service; - private static final int TEST_PORT = PortManager.nextFreePort();; + private static final int TEST_PORT = PortManager.nextFreePort(); + private final String configClusterName = "c1"; public ProxyAuthorizationTest() { super(); @@ -53,7 +54,7 @@ public ProxyAuthorizationTest() { @BeforeClass @Override protected void setup() throws Exception { - conf.setClusterName("c1"); + conf.setClusterName(configClusterName); internalSetup(); WebSocketProxyConfiguration config = new WebSocketProxyConfiguration(); @@ -81,7 +82,7 @@ public void test() throws Exception { assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), false); - admin.clusters().createCluster("c1", new ClusterData()); + admin.clusters().updateCluster(configClusterName, new ClusterData()); admin.properties().createProperty("p1", new PropertyAdmin(Lists.newArrayList("role1"), Sets.newHashSet("c1"))); waitForChange(); admin.namespaces().createNamespace("p1/c1/ns1"); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java index 1984e2a25dd97..779e7b117e54a 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java @@ -27,6 +27,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; /** @@ -284,4 +285,135 @@ void updateNamespaceIsolationPolicy(String cluster, String policyName, Namespace */ NamespaceIsolationData getNamespaceIsolationPolicy(String cluster, String policyName) throws PulsarAdminException; + /** + * Create a domain into cluster + *

+ * + * @param cluster + * Cluster name + * + * @param domainName + * domain name + * + * @param FailureDomain + * Domain configurations + * + * @return + * @throws NotAuthorizedException + * You don't have admin permission to create the cluster + * + * @throws ConflictException + * Broker already exist into other domain + * + * @throws NotFoundException + * Cluster doesn't exist + * + * @throws PreconditionFailedException + * Cluster doesn't exist + * + * @throws PulsarAdminException + * Unexpected error + */ + void createFailureDomain(String cluster, String domainName, FailureDomain domain) + throws PulsarAdminException; + + + /** + * Update a domain into cluster + *

+ * + * @param cluster + * Cluster name + * + * @param domainName + * domain name + * + * @param FailureDomain + * Domain configurations + * + * @return + * @throws NotAuthorizedException + * You don't have admin permission to create the cluster + * + * @throws ConflictException + * Broker already exist into other domain + * + * @throws NotFoundException + * Cluster doesn't exist + * + * @throws PreconditionFailedException + * Cluster doesn't exist + * + * @throws PulsarAdminException + * Unexpected error + */ + void updateFailureDomain(String cluster, String domainName, FailureDomain domain) + throws PulsarAdminException; + + + /** + * Delete a domain in cluster + *

+ * + * @param cluster + * Cluster name + * + * @param domainName + * Domain name + * + * @return + * @throws NotAuthorizedException + * You don't have admin permission to create the cluster + * + * @throws NotFoundException + * Cluster doesn't exist + * + * @throws PreconditionFailedException + * Cluster doesn't exist + * + * @throws PulsarAdminException + * Unexpected error + */ + + void deleteFailureDomain(String cluster, String domainName) throws PulsarAdminException; + + /** + * Get all registered domains in cluster + *

+ * + * @param cluster + * Cluster name + * @return + * @throws NotAuthorizedException + * You don't have admin permission to create the cluster + * + * @throws NotFoundException + * Cluster don't exist + * + * @throws PulsarAdminException + * Unexpected error + */ + Map getFailureDomains(String cluster) throws PulsarAdminException; + + /** + * Get the domain registered into a cluster + *

+ * + * @param cluster + * Cluster name + * @return + * @throws NotAuthorizedException + * You don't have admin permission to create the cluster + * + * @throws NotFoundException + * Domain doesn't exist + * + * @throws PreconditionFailedException + * Cluster doesn't exist + * + * @throws PulsarAdminException + * Unexpected error + */ + FailureDomain getFailureDomain(String cluster, String domainName) throws PulsarAdminException; + } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index ebe754c8e0dad..3c8d3d51efa48 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -420,6 +420,78 @@ public interface Namespaces { * Unexpected error */ void setNamespaceMessageTTL(String namespace, int ttlInSeconds) throws PulsarAdminException; + + + /** + * Set anti-affinity group name for a namespace + *

+ * Request example: + * + * @param namespace + * Namespace name + * @param namespaceAntiAffinityGroup + * anti-affinity group name for a namespace + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws PulsarAdminException + * Unexpected error + */ + void setNamespaceAntiAffinityGroup(String namespace, String namespaceAntiAffinityGroup) throws PulsarAdminException; + + /** + * Get all namespaces that grouped with given anti-affinity group + * + * @param property + * property is only used for authorization. Client has to be admin of any of the property to access this + * api api. + * @param cluster + * cluster name + * @param namespaceAntiAffinityGroup + * Anti-affinity group name + * @return list of namespace grouped under a given anti-affinity group + * @throws PulsarAdminException + */ + List getAntiAffinityNamespaces(String property, String cluster, String namespaceAntiAffinityGroup) + throws PulsarAdminException; + + /** + * Get anti-affinity group name for a namespace + *

+ * Response example: + * + *

+     * 60
+     * 
+ * + * @param namespace + * Namespace name + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws PulsarAdminException + * Unexpected error + */ + String getNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException; + + /** + * Delete anti-affinity group name for a namespace. + * + * @param namespace + * Namespace name + * + * @throws NotAuthorizedException + * You don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws PulsarAdminException + * Unexpected error + */ + void deleteNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException; /** * Set the deduplication status for all topics within a namespace. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java index 2fcf44a291386..666a2e8b07f8a 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java @@ -31,6 +31,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; @@ -150,4 +151,49 @@ public NamespaceIsolationData getNamespaceIsolationPolicy(String cluster, String throw getApiException(e); } } + + @Override + public void createFailureDomain(String cluster, String domainName, FailureDomain domain) throws PulsarAdminException { + setDomain(cluster, domainName, domain); + } + + @Override + public void updateFailureDomain(String cluster, String domainName, FailureDomain domain) throws PulsarAdminException { + setDomain(cluster, domainName, domain); + } + + @Override + public void deleteFailureDomain(String cluster, String domainName) throws PulsarAdminException { + request(clusters.path(cluster).path("failureDomains").path(domainName)).delete(ErrorData.class); + } + + @Override + public Map getFailureDomains(String cluster) throws PulsarAdminException { + try { + return request(clusters.path(cluster).path("failureDomains")) + .get(new GenericType>() { + }); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public FailureDomain getFailureDomain(String cluster, String domainName) throws PulsarAdminException { + try { + return request(clusters.path(cluster).path("failureDomains").path(domainName)).get(FailureDomain.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + private void setDomain(String cluster, String domainName, + FailureDomain domain) throws PulsarAdminException { + try { + request(clusters.path(cluster).path("failureDomains").path(domainName)).post( + Entity.entity(domain, MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 79aa162c4e18f..3f0c7e71c5cc8 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -228,6 +228,55 @@ public void setNamespaceMessageTTL(String namespace, int ttlInSeconds) throws Pu } } + @Override + public void setNamespaceAntiAffinityGroup(String namespace, String namespaceAntiAffinityGroup) + throws PulsarAdminException { + try { + NamespaceName ns = NamespaceName.get(namespace); + request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()) + .path("antiAffinity")).post(Entity.entity(namespaceAntiAffinityGroup, MediaType.APPLICATION_JSON), + ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public String getNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException { + try { + NamespaceName ns = NamespaceName.get(namespace); + return request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()) + .path("antiAffinity")).get(new GenericType() { + }); + + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public List getAntiAffinityNamespaces(String property, String cluster, String namespaceAntiAffinityGroup) + throws PulsarAdminException { + try { + return request(namespaces.path(cluster).path("antiAffinity").path(namespaceAntiAffinityGroup) + .queryParam("property", property)).get(new GenericType>() { + }); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void deleteNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException { + try { + NamespaceName ns = NamespaceName.get(namespace); + request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName()) + .path("antiAffinity")).delete(ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + @Override public void setDeduplicationStatus(String namespace, boolean enableDeduplication) throws PulsarAdminException { try { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java index 84bced071aceb..5ba6c1077337c 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java @@ -21,9 +21,11 @@ import java.util.Arrays; import org.apache.commons.lang3.StringUtils; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.FailureDomain; import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; @@ -125,6 +127,84 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Create a new failure-domain for a cluster. updates it if already created.") + private class CreateFailureDomain extends CliCommand { + @Parameter(description = "cluster-name\n", required = true) + private java.util.List params; + + @Parameter(names = "--domain-name", description = "domain-name", required = true) + private String domainName; + + @Parameter(names = "--broker-list", description = "Comma separated broker list", required = false) + private String brokerList; + + void run() throws PulsarAdminException { + String cluster = getOneArgument(params); + FailureDomain domain = new FailureDomain(); + domain.setBrokers((isNotBlank(brokerList) ? Sets.newHashSet(brokerList.split(",")): null)); + admin.clusters().createFailureDomain(cluster, domainName, domain); + } + } + + @Parameters(commandDescription = "Update failure-domain for a cluster. Creates a new one if not exist.") + private class UpdateFailureDomain extends CliCommand { + @Parameter(description = "cluster-name\n", required = true) + private java.util.List params; + + @Parameter(names = "--domain-name", description = "domain-name", required = true) + private String domainName; + + @Parameter(names = "--broker-list", description = "Comma separated broker list", required = false) + private String brokerList; + + void run() throws PulsarAdminException { + String cluster = getOneArgument(params); + FailureDomain domain = new FailureDomain(); + domain.setBrokers((isNotBlank(brokerList) ? Sets.newHashSet(brokerList.split(",")) : null)); + admin.clusters().updateFailureDomain(cluster, domainName, domain); + } + } + + @Parameters(commandDescription = "Deletes an existing failure-domain") + private class DeleteFailureDomain extends CliCommand { + @Parameter(description = "cluster-name\n", required = true) + private java.util.List params; + + @Parameter(names = "--domain-name", description = "domain-name", required = true) + private String domainName; + + void run() throws PulsarAdminException { + String cluster = getOneArgument(params); + admin.clusters().deleteFailureDomain(cluster, domainName); + } + } + + @Parameters(commandDescription = "List the existing failure-domains for a cluster") + private class ListFailureDomains extends CliCommand { + + @Parameter(description = "cluster-name\n", required = true) + private java.util.List params; + + void run() throws PulsarAdminException { + String cluster = getOneArgument(params); + print(admin.clusters().getFailureDomains(cluster)); + } + } + + @Parameters(commandDescription = "Get the configuration brokers of a failure-domain") + private class GetFailureDomain extends CliCommand { + @Parameter(description = "cluster-name\n", required = true) + private java.util.List params; + + @Parameter(names = "--domain-name", description = "domain-name", required = true) + private String domainName; + + void run() throws PulsarAdminException { + String cluster = getOneArgument(params); + print(admin.clusters().getFailureDomain(cluster, domainName)); + } + } + public CmdClusters(PulsarAdmin admin) { super("clusters", admin); jcommander.addCommand("get", new Get()); @@ -133,6 +213,11 @@ public CmdClusters(PulsarAdmin admin) { jcommander.addCommand("delete", new Delete()); jcommander.addCommand("list", new List()); jcommander.addCommand("update-peer-clusters", new UpdatePeerClusters()); + jcommander.addCommand("get-failure-domain", new GetFailureDomain()); + jcommander.addCommand("create-failure-domain", new CreateFailureDomain()); + jcommander.addCommand("update-failure-domain", new UpdateFailureDomain()); + jcommander.addCommand("delete-failure-domain", new DeleteFailureDomain()); + jcommander.addCommand("list-failure-domains", new ListFailureDomains()); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 37d7306f91f80..e04eef7be9493 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -213,6 +213,65 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Set Anti-affinity group name for a namspace") + private class SetAntiAffinityGroup extends CliCommand { + @Parameter(description = "property/cluster/namespace", required = true) + private java.util.List params; + + @Parameter(names = { "--group", "-g" }, description = "Anti-affinity group name", required = true) + private String antiAffinityGroup; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + admin.namespaces().setNamespaceAntiAffinityGroup(namespace, antiAffinityGroup); + } + } + + @Parameters(commandDescription = "Get Anti-affinity group name for a namspace") + private class GetAntiAffinityGroup extends CliCommand { + @Parameter(description = "property/cluster/namespace\n", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + print(admin.namespaces().getNamespaceAntiAffinityGroup(namespace)); + } + } + + @Parameters(commandDescription = "Get Anti-affinity namespaces grouped with the given anti-affinity group name") + private class GetAntiAffinityNamespaces extends CliCommand { + + @Parameter(names = { "--property", + "-p" }, description = "property is only used for authorization. Client has to be admin of any of the property to access this api", required = false) + private String property; + + @Parameter(names = { "--cluster", "-c" }, description = "Cluster name", required = true) + private String cluster; + + @Parameter(names = { "--group", "-g" }, description = "Anti-affinity group name", required = true) + private String antiAffinityGroup; + + @Override + void run() throws PulsarAdminException { + print(admin.namespaces().getAntiAffinityNamespaces(property, cluster, antiAffinityGroup)); + } + } + + @Parameters(commandDescription = "Remove Anti-affinity group name for a namspace") + private class DeleteAntiAffinityGroup extends CliCommand { + @Parameter(description = "property/cluster/namespace\n", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + admin.namespaces().deleteNamespaceAntiAffinityGroup(namespace); + } + } + + @Parameters(commandDescription = "Enable or disable deduplication for a namespace") private class SetDeduplication extends CliCommand { @Parameter(description = "property/cluster/namespace", required = true) @@ -621,6 +680,11 @@ public CmdNamespaces(PulsarAdmin admin) { jcommander.addCommand("get-message-ttl", new GetMessageTTL()); jcommander.addCommand("set-message-ttl", new SetMessageTTL()); + + jcommander.addCommand("get-anti-affinity-group", new GetAntiAffinityGroup()); + jcommander.addCommand("set-anti-affinity-group", new SetAntiAffinityGroup()); + jcommander.addCommand("get-anti-affinity-namespaces", new GetAntiAffinityNamespaces()); + jcommander.addCommand("delete-anti-affinity-group", new DeleteAntiAffinityGroup()); jcommander.addCommand("set-deduplication", new SetDeduplication()); diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 69d57da74b913..ba12db7e8badf 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -40,6 +40,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PropertyAdmin; import org.apache.pulsar.common.policies.data.ResourceQuota; @@ -131,6 +132,24 @@ void clusters() throws Exception { clusters.run(split("delete use")); verify(mockClusters).deleteCluster("use"); + + clusters.run(split("list-failure-domains use")); + verify(mockClusters).getFailureDomains("use"); + + clusters.run(split("get-failure-domain use --domain-name domain")); + verify(mockClusters).getFailureDomain("use", "domain"); + + clusters.run(split("create-failure-domain use --domain-name domain --broker-list b1")); + FailureDomain domain = new FailureDomain(); + domain.setBrokers(Sets.newHashSet("b1")); + verify(mockClusters).createFailureDomain("use", "domain", domain); + + clusters.run(split("update-failure-domain use --domain-name domain --broker-list b1")); + verify(mockClusters).updateFailureDomain("use", "domain", domain); + + clusters.run(split("delete-failure-domain use --domain-name domain")); + verify(mockClusters).deleteFailureDomain("use", "domain"); + // Re-create CmdClusters to avoid a issue. // See https://github.com/cbeust/jcommander/issues/271 @@ -284,6 +303,19 @@ void namespaces() throws Exception { namespaces.run(split("get-message-ttl myprop/clust/ns1")); verify(mockNamespaces).getNamespaceMessageTTL("myprop/clust/ns1"); + + namespaces.run(split("set-anti-affinity-group myprop/clust/ns1 -g group")); + verify(mockNamespaces).setNamespaceAntiAffinityGroup("myprop/clust/ns1", "group"); + + namespaces.run(split("get-anti-affinity-group myprop/clust/ns1")); + verify(mockNamespaces).getNamespaceAntiAffinityGroup("myprop/clust/ns1"); + + namespaces.run(split("get-anti-affinity-namespaces -p dummy -c cluster -g group")); + verify(mockNamespaces).getAntiAffinityNamespaces("dummy", "cluster", "group"); + + namespaces.run(split("delete-anti-affinity-group myprop/clust/ns1 ")); + verify(mockNamespaces).deleteNamespaceAntiAffinityGroup("myprop/clust/ns1"); + namespaces.run(split("set-retention myprop/clust/ns1 -t 1h -s 1M")); verify(mockNamespaces).setRetention("myprop/clust/ns1", new RetentionPolicies(60, 1)); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FailureDomain.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FailureDomain.java new file mode 100644 index 0000000000000..dd1d09b8aff6e --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FailureDomain.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import java.util.HashSet; +import java.util.Set; + +import com.google.common.base.Objects; + +public class FailureDomain { + + public Set brokers = new HashSet(); + + public Set getBrokers() { + return brokers; + } + + public void setBrokers(Set brokers) { + this.brokers = brokers; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof FailureDomain) { + FailureDomain other = (FailureDomain) obj; + return Objects.equal(brokers, other.brokers); + } + + return false; + } + + @Override + public String toString() { + return Objects.toStringHelper(this).add("brokers", brokers).toString(); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index a46b5fee03787..f3486b8592029 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -42,6 +42,7 @@ public class Policies { public int message_ttl_in_seconds = 0; public RetentionPolicies retention_policies = null; public boolean deleted = false; + public String antiAffinityGroup; public static final String FIRST_BOUNDARY = "0x00000000"; public static final String LAST_BOUNDARY = "0xffffffff"; @@ -63,7 +64,8 @@ public boolean equals(Object obj) { && message_ttl_in_seconds == other.message_ttl_in_seconds && Objects.equals(retention_policies, other.retention_policies) && Objects.equals(encryption_required, other.encryption_required) - && Objects.equals(subscription_auth_mode, other.subscription_auth_mode); + && Objects.equals(subscription_auth_mode, other.subscription_auth_mode) + && Objects.equals(antiAffinityGroup, other.antiAffinityGroup); } return false; @@ -86,6 +88,7 @@ public String toString() { .add("deduplicationEnabled", deduplicationEnabled) .add("clusterDispatchRate", clusterDispatchRate) .add("latency_stats_sample_rate", latency_stats_sample_rate) + .add("antiAffinityGroup", antiAffinityGroup) .add("message_ttl_in_seconds", message_ttl_in_seconds).add("retention_policies", retention_policies) .add("deleted", deleted) .add("encryption_required", encryption_required) diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java index 06bd0557e005f..556a8b548807b 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java @@ -64,6 +64,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private final String configClusterName = "use"; @BeforeMethod @Override @@ -92,7 +93,7 @@ protected void setup() throws Exception { providers.add(AuthenticationProviderTls.class.getName()); conf.setAuthenticationProviders(providers); - conf.setClusterName("use"); + conf.setClusterName(configClusterName); super.init(); @@ -157,7 +158,7 @@ public void testTlsSyncProducerAndConsumer() throws Exception { // create a client which connects to proxy over tls and pass authData PulsarClient proxyClient = createPulsarClient(authTls, proxyServiceUrl); - admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), + admin.clusters().updateCluster(configClusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS)); admin.properties().createProperty("my-property", new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use"))); From 4396a8ba6cc1bdfbefdfd0d78048dd70632f85e2 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Wed, 6 Dec 2017 18:44:47 -0800 Subject: [PATCH 2/5] rebase master --- .../apache/pulsar/broker/admin/Clusters.java | 11 ++++++++--- .../pulsar/broker/web/PulsarWebResource.java | 19 ++++++++++--------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Clusters.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Clusters.java index d00826a20c709..612e2e254b069 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Clusters.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Clusters.java @@ -146,9 +146,14 @@ public void updateCluster(@PathParam("cluster") String cluster, ClusterData clus String clusterPath = path("clusters", cluster); Stat nodeStat = new Stat(); byte[] content = globalZk().getData(clusterPath, null, nodeStat); - ClusterData currentClusterData = jsonMapper().readValue(content, ClusterData.class); - // only update cluster-url-data and not overwrite other metadata such as peerClusterNames - currentClusterData.update(clusterData); + ClusterData currentClusterData = null; + if (content.length > 0) { + currentClusterData = jsonMapper().readValue(content, ClusterData.class); + // only update cluster-url-data and not overwrite other metadata such as peerClusterNames + currentClusterData.update(clusterData); + } else { + currentClusterData = clusterData; + } // Write back the new updated ClusterData into zookeeper globalZk().setData(clusterPath, jsonMapper().writeValueAsBytes(currentClusterData), nodeStat.getVersion()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 6700649616212..40b7d03f2fdca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -174,19 +174,20 @@ protected void validateAdminAccessOnProperty(String property) { } protected static void validateAdminAccessOnProperty(PulsarService pulsar, String clientAppId, String property) throws RestException, Exception{ - PropertyAdmin propertyAdmin; - - try { - propertyAdmin = pulsar.getConfigurationCache().propertiesCache().get(path(POLICIES, property)) - .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Property does not exist")); - } catch (KeeperException.NoNodeException e) { - log.warn("Failed to get property admin data for non existing property {}", property); - throw new RestException(Status.NOT_FOUND, "Property does not exist"); - } if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration().isAuthorizationEnabled()) { log.debug("check admin access on property: {} - Authenticated: {} -- role: {}", property, (isClientAuthenticated(clientAppId)), clientAppId); + + PropertyAdmin propertyAdmin; + + try { + propertyAdmin = pulsar.getConfigurationCache().propertiesCache().get(path(POLICIES, property)) + .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Property does not exist")); + } catch (KeeperException.NoNodeException e) { + log.warn("Failed to get property admin data for non existing property {}", property); + throw new RestException(Status.NOT_FOUND, "Property does not exist"); + } if (!isClientAuthenticated(clientAppId)) { throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request"); From 42d5170801673cb37469fe8b1c29379f5b93e8d9 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Fri, 8 Dec 2017 14:45:37 -0800 Subject: [PATCH 3/5] fix test --- .../pulsar/discovery/service/web/DiscoveryServiceWebTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java index 3bac504226361..8398ea2ec41f1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java @@ -76,7 +76,7 @@ protected void cleanup() throws Exception { * @throws Exception */ @Test - public void testRiderectUrlWithServerStarted() throws Exception { + public void testRedirectUrlWithServerStarted() throws Exception { // 1. start server int port = PortManager.nextFreePort(); ServiceConfig config = new ServiceConfig(); @@ -100,7 +100,7 @@ public void testRiderectUrlWithServerStarted() throws Exception { **/ assertEquals(hitBrokerService(HttpMethod.POST, postRequestUrl, Lists.newArrayList("use")), - "Property does not exist"); + "Cannot set replication on a non-global namespace"); assertEquals(hitBrokerService(HttpMethod.PUT, putRequestUrl, new BundlesData(1)), "Property does not exist"); assertEquals(hitBrokerService(HttpMethod.GET, getRequestUrl, null), "Property does not exist"); From 6ef8f64f487cb53680fc2b22c1b440768ab92b15 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Wed, 24 Jan 2018 11:23:39 -0800 Subject: [PATCH 4/5] fix auth test --- .../AuthenticatedProducerConsumerTest.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java index 7324074d6bb86..83b15f5a6d3c4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java @@ -63,7 +63,6 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem"; private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem"; private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem"; - private final String configClustername = "use"; @BeforeMethod @Override @@ -94,6 +93,8 @@ protected void setup() throws Exception { providers.add(AuthenticationProviderTls.class.getName()); conf.setAuthenticationProviders(providers); + conf.setClusterName("use"); + super.init(); } @@ -155,7 +156,7 @@ public void testSyncProducerAndConsumer(int batchMessageDelayMs) throws Exceptio consumer.close(); } - @Test(timeOut = 5000, dataProvider = "batch") + @Test(dataProvider = "batch") public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception { log.info("-- Starting {} test --", methodName); @@ -166,7 +167,7 @@ public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Excep authTls.configure(authParams); internalSetup(authTls); - admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), + admin.clusters().updateCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS)); admin.properties().createProperty("my-property", new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use"))); @@ -177,7 +178,7 @@ public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Excep log.info("-- Exiting {} test --", methodName); } - @Test(timeOut = 5000, dataProvider = "batch") + @Test(dataProvider = "batch") public void testAnonymousSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception { log.info("-- Starting {} test --", methodName); @@ -188,7 +189,7 @@ public void testAnonymousSyncProducerAndConsumer(int batchMessageDelayMs) throws authTls.configure(authParams); internalSetup(authTls); - admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), + admin.clusters().updateCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS)); admin.properties().createProperty("my-property", new PropertyAdmin(Lists.newArrayList("anonymousUser"), Sets.newHashSet("use"))); @@ -225,7 +226,7 @@ public void testAnonymousSyncProducerAndConsumer(int batchMessageDelayMs) throws * * @throws Exception */ - @Test(timeOut = 5000) + @Test public void testAuthenticationFilterNegative() throws Exception { log.info("-- Starting {} test --", methodName); @@ -242,7 +243,7 @@ public void testAuthenticationFilterNegative() throws Exception { // this will cause NPE and it should throw 500 doReturn(null).when(pulsar).getGlobalZkCache(); try { - admin.clusters().createCluster(cluster, clusterData); + admin.clusters().updateCluster(cluster, clusterData); } catch (PulsarAdminException e) { Assert.assertTrue(e.getCause() instanceof InternalServerErrorException); } @@ -256,7 +257,7 @@ public void testAuthenticationFilterNegative() throws Exception { * * @throws Exception */ - @Test(timeOut = 5000) + @Test public void testInternalServerExceptionOnLookup() throws Exception { log.info("-- Starting {} test --", methodName); @@ -267,7 +268,7 @@ public void testInternalServerExceptionOnLookup() throws Exception { authTls.configure(authParams); internalSetup(authTls); - admin.clusters().updateCluster(configClustername, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), + admin.clusters().updateCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS)); admin.properties().createProperty("my-property", new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use"))); @@ -291,4 +292,4 @@ public void testInternalServerExceptionOnLookup() throws Exception { } -} +} \ No newline at end of file From c9193438dc45fe42a6dff7ac12df4b758c946a7f Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Wed, 24 Jan 2018 13:48:05 -0800 Subject: [PATCH 5/5] fix test with updateCluster --- .../apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java | 1 - .../apache/pulsar/broker/service/BrokerBkEnsemblesTests.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 8228d21da4a9d..57ed5e5e4d05c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -97,7 +97,6 @@ protected void resetConfig() { this.conf.setActiveConsumerFailoverDelayTimeMillis(0); this.conf.setDefaultNumberOfNamespaceBundles(1); this.conf.setZookeeperServers("localhost:2181"); - this.conf.setClusterName("mock"); } protected final void internalSetup() throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index 347ffaf5640c9..3e1f9d4cbd5fd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -92,7 +92,7 @@ void setup() throws Exception { adminUrl = new URL("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT); admin = new PulsarAdmin(adminUrl, (Authentication) null); - admin.clusters().createCluster("usc", new ClusterData(adminUrl.toString())); + admin.clusters().updateCluster("usc", new ClusterData(adminUrl.toString())); admin.properties().createProperty("prop", new PropertyAdmin(Lists.newArrayList("appid1"), Sets.newHashSet("usc"))); } catch (Throwable t) {