Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-7 Introduce Failure-domain and Anti-affinity-namespace group #896

Merged
merged 5 commits into from
Jan 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ advertisedAddress=
# Name of the cluster to which this broker belongs to
clusterName=

# Enable cluster's failure-domain which can distribute brokers into logical region
failureDomainsEnabled=false

# Zookeeper session timeout in milliseconds
zooKeeperSessionTimeoutMillis=30000

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ advertisedAddress=
# Name of the cluster to which this broker belongs to
clusterName=standalone

# Enable cluster's failure-domain which can distribute brokers into logical region
failureDomainsEnabled=false

# Zookeeper session timeout in milliseconds
zooKeeperSessionTimeoutMillis=30000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public class ServiceConfiguration implements PulsarConfiguration {
// Name of the cluster to which this broker belongs to
@FieldContext(required = true)
private String clusterName;
// Enable cluster's failure-domain which can distribute brokers into logical region
@FieldContext(dynamic = true)
private boolean failureDomainsEnabled = false;
// Zookeeper session timeout in milliseconds
private long zooKeeperSessionTimeoutMillis = 30000;
// Time to wait for broker graceful shutdown. After this time elapses, the
Expand Down Expand Up @@ -468,6 +471,14 @@ public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}

public boolean isFailureDomainsEnabled() {
return failureDomainsEnabled;
}

public void setFailureDomainsEnabled(boolean failureDomainsEnabled) {
this.failureDomainsEnabled = failureDomainsEnabled;
}

public long getBrokerShutdownTimeoutMs() {
return brokerShutdownTimeoutMs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.util.Map;

import org.apache.bookkeeper.util.ZkUtils;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
Expand All @@ -39,6 +41,7 @@
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Maps;

/**
* ConfigurationCacheService maintains a local in-memory cache of all the configurations and policies stored in
Expand All @@ -53,13 +56,21 @@ public class ConfigurationCacheService {
private ZooKeeperDataCache<Policies> policiesCache;
private ZooKeeperDataCache<ClusterData> clustersCache;
private ZooKeeperChildrenCache clustersListCache;
private ZooKeeperChildrenCache failureDomainListCache;
private ZooKeeperDataCache<NamespaceIsolationPolicies> namespaceIsolationPoliciesCache;
private ZooKeeperDataCache<FailureDomain> failureDomainCache;

public static final String POLICIES = "policies";
public static final String FAILURE_DOMAIN = "failureDomain";
public final String CLUSTER_FAILURE_DOMAIN_ROOT;
public static final String POLICIES_ROOT = "/admin/policies";
private static final String CLUSTERS_ROOT = "/admin/clusters";

public ConfigurationCacheService(ZooKeeperCache cache) throws PulsarServerException {
this(cache, null);
}

public ConfigurationCacheService(ZooKeeperCache cache, String configuredClusterName) throws PulsarServerException {
this.cache = cache;

initZK();
Expand All @@ -86,6 +97,12 @@ public ClusterData deserialize(String path, byte[] content) throws Exception {
};

this.clustersListCache = new ZooKeeperChildrenCache(cache, CLUSTERS_ROOT);

CLUSTER_FAILURE_DOMAIN_ROOT = CLUSTERS_ROOT + "/" + configuredClusterName + "/" + FAILURE_DOMAIN;
if (isNotBlank(configuredClusterName)) {
createFailureDomainRoot(cache.getZooKeeper(), CLUSTER_FAILURE_DOMAIN_ROOT);
this.failureDomainListCache = new ZooKeeperChildrenCache(cache, CLUSTER_FAILURE_DOMAIN_ROOT);
}

this.namespaceIsolationPoliciesCache = new ZooKeeperDataCache<NamespaceIsolationPolicies>(cache) {
@Override
Expand All @@ -96,6 +113,31 @@ public NamespaceIsolationPolicies deserialize(String path, byte[] content) throw
}));
}
};

this.failureDomainCache = new ZooKeeperDataCache<FailureDomain>(cache) {
@Override
public FailureDomain deserialize(String path, byte[] content) throws Exception {
return ObjectMapperFactory.getThreadLocal().readValue(content, FailureDomain.class);
}
};
}

private void createFailureDomainRoot(ZooKeeper zk, String path) {
try {
if (zk.exists(path, false) == null) {
try {
byte[] data = "".getBytes();
ZkUtils.createFullPathOptimistic(zk, path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
LOG.info("Successfully created failure-domain znode at {}", path);
} catch (KeeperException.NodeExistsException e) {
// Ok
}
}
} catch (KeeperException.NodeExistsException e) {
// Ok
} catch (Exception e) {
LOG.warn("Failed to create failure-domain znode {} ", path, e);
}
}

private void initZK() throws PulsarServerException {
Expand Down Expand Up @@ -138,11 +180,19 @@ public ZooKeeperChildrenCache clustersListCache() {
return this.clustersListCache;
}

public ZooKeeperChildrenCache failureDomainListCache() {
return this.failureDomainListCache;
}

public ZooKeeper getZooKeeper() {
return this.cache.getZooKeeper();
}

public ZooKeeperDataCache<NamespaceIsolationPolicies> namespaceIsolationPoliciesCache() {
return this.namespaceIsolationPoliciesCache;
}

public ZooKeeperDataCache<FailureDomain> failureDomainCache() {
return this.failureDomainCache;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ private void startZkCacheService() throws PulsarServerException {
throw new PulsarServerException(e);
}

this.configurationCacheService = new ConfigurationCacheService(getGlobalZkCache());
this.configurationCacheService = new ConfigurationCacheService(getGlobalZkCache(), this.config.getClusterName());
this.localZkCacheService = new LocalZooKeeperCacheService(getLocalZkCache(), this.configurationCacheService);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.admin;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;

import java.net.MalformedURLException;
import java.net.URI;
Expand All @@ -45,6 +47,7 @@
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
Expand All @@ -64,7 +67,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

public abstract class AdminResource extends PulsarWebResource {
private static final Logger log = LoggerFactory.getLogger(AdminResource.class);
Expand Down Expand Up @@ -202,6 +204,7 @@ protected List<String> getListOfNamespaces(String property) throws Exception {
return namespaces;
}


/**
* Redirect the call to the specified broker
*
Expand Down Expand Up @@ -286,6 +289,14 @@ protected ZooKeeperDataCache<NamespaceIsolationPolicies> namespaceIsolationPolic
return pulsar().getConfigurationCache().namespaceIsolationPoliciesCache();
}

protected ZooKeeperDataCache<FailureDomain> failureDomainCache() {
return pulsar().getConfigurationCache().failureDomainCache();
}

protected ZooKeeperChildrenCache failureDomainListCache() {
return pulsar().getConfigurationCache().failureDomainListCache();
}

protected PartitionedTopicMetadata getPartitionedTopicMetadata(String property, String cluster, String namespace,
String destination, boolean authoritative) {
DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
Expand Down Expand Up @@ -354,5 +365,14 @@ public PartitionedTopicMetadata deserialize(String key, byte[] content) throws E
}
return metadataFuture;
}


protected void validateClusterExists(String cluster) {
try {
if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist.");
}
} catch (Exception e) {
throw new RestException(e);
}
}
}
Loading