Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-broker] topic resources use metadata-store api #9485

Merged
merged 2 commits into from
Feb 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ public void start() throws PulsarServerException {
coordinationService = new CoordinationServiceImpl(localMetadataStore);

configurationMetadataStore = createConfigurationMetadataStore();
pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore);
pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore,
config.getZooKeeperOperationTimeoutSeconds());

orderedExecutor = OrderedExecutor.newBuilder()
.numThreads(config.getNumOrderedExecutorThreads())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
Expand All @@ -81,7 +84,6 @@
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -133,14 +135,6 @@ protected void zkCreateOptimisticAsync(ZooKeeper zk, String path,
CreateMode.PERSISTENT, callback, null);
}

protected boolean zkPathExists(String path) throws KeeperException, InterruptedException {
Stat stat = globalZk().exists(path, false);
if (null != stat) {
return true;
}
return false;
}

protected void zkSync(String path) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger rc = new AtomicInteger(KeeperException.Code.OK.intValue());
Expand Down Expand Up @@ -247,29 +241,31 @@ protected CompletableFuture<Void> tryCreatePartitionsAsync(int numPartitions) {

private CompletableFuture<Void> tryCreatePartitionAsync(final int partition, CompletableFuture<Void> reuseFuture) {
CompletableFuture<Void> result = reuseFuture == null ? new CompletableFuture<>() : reuseFuture;
zkCreateOptimisticAsync(localZk(),
ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0],
(rc, s, o, s1) -> {
if (KeeperException.Code.OK.intValue() == rc) {
if (log.isDebugEnabled()) {
log.debug("[{}] Topic partition {} created.", clientAppId(),
topicName.getPartition(partition));
}
result.complete(null);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
namespaceResources().getLocalStore()
.put(ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0], Optional.of(-1L))
.thenAccept(r -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Topic partition {} created.", clientAppId(), topicName.getPartition(partition));
}
result.complete(null);
}).exceptionally(ex -> {
if (ex.getCause() instanceof AlreadyExistsException) {
log.info("[{}] Topic partition {} is exists, doing nothing.", clientAppId(),
topicName.getPartition(partition));
result.complete(null);
} else if (KeeperException.Code.BADVERSION.intValue() == rc) {
log.warn("[{}] Fail to create topic partition {} with concurrent modification, retry now.",
clientAppId(), topicName.getPartition(partition));
tryCreatePartitionAsync(partition, result);
} else {
log.error("[{}] Fail to create topic partition {}", clientAppId(),
topicName.getPartition(partition), KeeperException.create(KeeperException.Code.get(rc)));
result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
}
});
} else if (ex.getCause() instanceof BadVersionException) {
log.warn("[{}] Partitioned topic {} is already created.", clientAppId(),
topicName.getPartition(partition));
// metadata-store api returns BadVersionException if node already exists while creating the
// resource
result.complete(null);
} else {
log.error("[{}] Fail to create topic partition {}", clientAppId(),
topicName.getPartition(partition), ex.getCause());
result.completeExceptionally(ex.getCause());
}
return null;
});
return result;
}

Expand Down Expand Up @@ -729,11 +725,11 @@ protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
try {
String partitionedTopicPath = path(PARTITIONED_TOPIC_PATH_ZNODE,
namespaceName.toString(), topicDomain.value());
List<String> topics = globalZk().getChildren(partitionedTopicPath, false);
List<String> topics = namespaceResources().getChildren(partitionedTopicPath);
partitionedTopics = topics.stream()
.map(s -> String.format("%s://%s/%s", topicDomain.value(), namespaceName.toString(), decode(s)))
.collect(Collectors.toList());
} catch (KeeperException.NoNodeException e) {
} catch (NotFoundException e) {
// NoNode means there are no partitioned topics in this domain for this namespace
} catch (Exception e) {
log.error("[{}] Failed to get partitioned topic list for namespace {}", clientAppId(),
Expand Down Expand Up @@ -828,49 +824,37 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n

try {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
zkCreateOptimisticAsync(globalZk(), path, data, (rc, s, o, s1) -> {
if (KeeperException.Code.OK.intValue() == rc) {
globalZk().sync(path, (rc2, s2, ctx) -> {
if (KeeperException.Code.OK.intValue() == rc2) {
log.info("[{}] Successfully created partitioned topic {}",
namespaceResources().getPartitionedTopicResources()
.createAsync(path, new PartitionedTopicMetadata(numPartitions)).thenAccept(r -> {
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
log.info("[{}] Successfully created partitions for topic {}", clientAppId(),
topicName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
log.error("[{}] Failed to create partitions for topic {}", clientAppId(),
topicName);
// The partitioned topic is created but there are some partitions create failed
asyncResponse.resume(new RestException(e));
return null;
});
}).exceptionally(ex -> {
if (ex.getCause() instanceof AlreadyExistsException) {
log.warn("[{}] Failed to create already existing partitioned topic {}",
clientAppId(), topicName);
tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
log.info("[{}] Successfully created partitions for topic {}",
clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
log.error("[{}] Failed to create partitions for topic {}",
clientAppId(), topicName);
// The partitioned topic is created but there are some partitions create failed
asyncResponse.resume(new RestException(e));
return null;
});
} else {
log.error("[{}] Failed to create partitioned topic {}",
clientAppId(), topicName,
KeeperException.create(KeeperException.Code.get(rc2)));
asyncResponse.resume(
new RestException(KeeperException.create(KeeperException.Code.get(rc2))));
new RestException(Status.CONFLICT, "Partitioned topic already exists"));
} else if (ex.getCause() instanceof BadVersionException) {
log.warn("[{}] Failed to create partitioned topic {}: concurrent modification",
clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
} else {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName,
ex.getCause());
asyncResponse.resume(new RestException(ex.getCause()));
}
}, null);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
log.warn("[{}] Failed to create already existing partitioned topic {}",
clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.CONFLICT,
"Partitioned topic already exists"));
} else if (KeeperException.Code.BADVERSION.intValue() == rc) {
log.warn("[{}] Failed to create partitioned topic {}: concurrent modification",
clientAppId(),
topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
} else {
log.error("[{}] Failed to create partitioned topic {}",
clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc)));
asyncResponse.resume(
new RestException(KeeperException.create(KeeperException.Code.get(rc))));
}
});
return null;
});
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.Getter;
import org.apache.pulsar.metadata.api.MetadataCache;
Expand All @@ -41,20 +42,23 @@ public class BaseResources<T> {
private final MetadataStoreExtended store;
@Getter
private final MetadataCache<T> cache;
private int operationTimeoutSec;

public BaseResources(MetadataStoreExtended store, Class<T> clazz) {
public BaseResources(MetadataStoreExtended store, Class<T> clazz, int operationTimeoutSec) {
this.store = store;
this.cache = store.getMetadataCache(clazz);
this.operationTimeoutSec = operationTimeoutSec;
}

public BaseResources(MetadataStoreExtended store, TypeReference<T> typeRef) {
public BaseResources(MetadataStoreExtended store, TypeReference<T> typeRef, int operationTimeoutSec) {
this.store = store;
this.cache = store.getMetadataCache(typeRef);
this.operationTimeoutSec = operationTimeoutSec;
}

public List<String> getChildren(String path) throws MetadataStoreException {
try {
return getChildrenAsync(path).get();
return getChildrenAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
Expand All @@ -69,7 +73,7 @@ public CompletableFuture<List<String>> getChildrenAsync(String path) {

public Optional<T> get(String path) throws MetadataStoreException {
try {
return getAsync(path).get();
return getAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
Expand All @@ -84,7 +88,7 @@ public CompletableFuture<Optional<T>> getAsync(String path) {

public void set(String path, Function<T, T> modifyFunction) throws MetadataStoreException {
try {
setAsync(path, modifyFunction).get();
setAsync(path, modifyFunction).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
Expand All @@ -99,7 +103,7 @@ public CompletableFuture<Void> setAsync(String path, Function<T, T> modifyFuncti

public void setWithCreate(String path, Function<Optional<T>, T> createFunction) throws MetadataStoreException {
try {
setWithCreateAsync(path, createFunction).get();
setWithCreateAsync(path, createFunction).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
Expand All @@ -114,7 +118,7 @@ public CompletableFuture<Void> setWithCreateAsync(String path, Function<Optional

public void create(String path, T data) throws MetadataStoreException {
try {
createAsync(path, data).get();
createAsync(path, data).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
Expand All @@ -129,7 +133,7 @@ public CompletableFuture<Void> createAsync(String path, T data) {

public void delete(String path) throws MetadataStoreException {
try {
deleteAsync(path).get();
deleteAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
Expand All @@ -144,7 +148,7 @@ public CompletableFuture<Void> deleteAsync(String path) {

public boolean exists(String path) throws MetadataStoreException {
try {
return existsAsync(path).get();
return existsAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw (e.getCause() instanceof MetadataStoreException) ? (MetadataStoreException) e.getCause()
: new MetadataStoreException(e.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public class ClusterResources extends BaseResources<ClusterData> {
@Getter
private FailureDomainResources failureDomainResources;

public ClusterResources(MetadataStoreExtended store) {
super(store, ClusterData.class);
this.failureDomainResources = new FailureDomainResources(store, FailureDomain.class);
public ClusterResources(MetadataStoreExtended store, int operationTimeoutSec) {
super(store, ClusterData.class, operationTimeoutSec);
this.failureDomainResources = new FailureDomainResources(store, FailureDomain.class, operationTimeoutSec);
}

public Set<String> list() throws MetadataStoreException {
Expand All @@ -44,8 +44,9 @@ public Set<String> list() throws MetadataStoreException {
public static class FailureDomainResources extends BaseResources<FailureDomain> {
public static final String FAILURE_DOMAIN = "failureDomain";

public FailureDomainResources(MetadataStoreExtended store, Class<FailureDomain> clazz) {
super(store, clazz);
public FailureDomainResources(MetadataStoreExtended store, Class<FailureDomain> clazz,
int operationTimeoutSec) {
super(store, clazz, operationTimeoutSec);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@

public class DynamicConfigurationResources extends BaseResources<Map<String, String>> {

public DynamicConfigurationResources(MetadataStoreExtended store) {
super(store, new TypeReference<Map<String, String>>(){});
public DynamicConfigurationResources(MetadataStoreExtended store, int operationTimeoutSec) {
super(store, new TypeReference<Map<String, String>>() {
}, operationTimeoutSec);
}

}
Loading