Skip to content

Commit

Permalink
[pulsar-broker] topics resources use metadata-store api
Browse files Browse the repository at this point in the history
[pulsar-broker] MockZK: Handle zk-children watch notification

fix test

fix sync function initialization
  • Loading branch information
rdhabalia committed Feb 6, 2021
1 parent 38481a9 commit be4b114
Show file tree
Hide file tree
Showing 17 changed files with 258 additions and 243 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,26 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static org.junit.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.CreateMode;
Expand All @@ -41,6 +48,14 @@

public class MetaStoreImplTest extends MockedBookKeeperTestCase {

@Data
@AllArgsConstructor
@NoArgsConstructor
static class MyClass {
String a;
int b;
}

@Test
void getMLList() throws Exception {
MetaStore store = new MetaStoreImpl(new ZKMetadataStore(zkc), executor);
Expand Down Expand Up @@ -226,4 +241,31 @@ public void operationComplete(Void result, Stat version) {

promise.get();
}

@Test
public void testGetChildrenWatch() throws Exception {
MetadataStore store = new ZKMetadataStore(zkc);
MetadataCache<MyClass> objCache1 = store.getMetadataCache(MyClass.class);

String path = "/managed-ledgers/prop-xyz/ns1/persistent";
assertTrue(objCache1.getChildren(path).get().isEmpty());

CountDownLatch latch = new CountDownLatch(1);
ZkUtils.asyncCreateFullPathOptimistic(zkc, "/managed-ledgers/prop-xyz/ns1/persistent/t1", "".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path2, ctx, name) -> {
latch.countDown();
}, null);
latch.await();

ManagedLedgerTest.retryStrategically((test) -> {
try {
return !objCache1.getChildren(path).get().isEmpty();
} catch (Exception e) {
// Ok
}
return false;
}, 5, 1000);
assertFalse(objCache1.getChildren(path).get().isEmpty());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
Expand All @@ -80,7 +83,6 @@
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -128,14 +130,6 @@ protected void zkCreateOptimisticAsync(ZooKeeper zk, String path,
CreateMode.PERSISTENT, callback, null);
}

protected boolean zkPathExists(String path) throws KeeperException, InterruptedException {
Stat stat = globalZk().exists(path, false);
if (null != stat) {
return true;
}
return false;
}

protected void zkSync(String path) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger rc = new AtomicInteger(KeeperException.Code.OK.intValue());
Expand Down Expand Up @@ -242,29 +236,29 @@ protected CompletableFuture<Void> tryCreatePartitionsAsync(int numPartitions) {

private CompletableFuture<Void> tryCreatePartitionAsync(final int partition, CompletableFuture<Void> reuseFuture) {
CompletableFuture<Void> result = reuseFuture == null ? new CompletableFuture<>() : reuseFuture;
zkCreateOptimisticAsync(localZk(),
ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0],
(rc, s, o, s1) -> {
if (KeeperException.Code.OK.intValue() == rc) {
if (log.isDebugEnabled()) {
log.debug("[{}] Topic partition {} created.", clientAppId(),
topicName.getPartition(partition));
}
result.complete(null);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
namespaceResources().getLocalStore()
.put(ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0], Optional.of(-1L))
.thenAccept(r -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Topic partition {} created.", clientAppId(), topicName.getPartition(partition));
}
result.complete(null);
}).exceptionally(ex -> {
if (ex.getCause() instanceof AlreadyExistsException) {
log.info("[{}] Topic partition {} is exists, doing nothing.", clientAppId(),
topicName.getPartition(partition));
result.complete(null);
} else if (KeeperException.Code.BADVERSION.intValue() == rc) {
log.warn("[{}] Fail to create topic partition {} with concurrent modification, retry now.",
clientAppId(), topicName.getPartition(partition));
tryCreatePartitionAsync(partition, result);
} else {
log.error("[{}] Fail to create topic partition {}", clientAppId(),
topicName.getPartition(partition), KeeperException.create(KeeperException.Code.get(rc)));
result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
}
});
} else if (ex.getCause() instanceof BadVersionException) {
log.warn("[{}] Fail to create topic partition {} with concurrent modification, retry now.",
clientAppId(), topicName.getPartition(partition));
tryCreatePartitionAsync(partition, result);
} else {
log.error("[{}] Fail to create topic partition {}", clientAppId(),
topicName.getPartition(partition), ex.getCause());
result.completeExceptionally(ex.getCause());
}
return null;
});
return result;
}

Expand Down Expand Up @@ -726,11 +720,11 @@ protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
try {
String partitionedTopicPath = path(PARTITIONED_TOPIC_PATH_ZNODE,
namespaceName.toString(), topicDomain.value());
List<String> topics = globalZk().getChildren(partitionedTopicPath, false);
List<String> topics = namespaceResources().getChildren(partitionedTopicPath);
partitionedTopics = topics.stream()
.map(s -> String.format("%s://%s/%s", topicDomain.value(), namespaceName.toString(), decode(s)))
.collect(Collectors.toList());
} catch (KeeperException.NoNodeException e) {
} catch (NotFoundException e) {
// NoNode means there are no partitioned topics in this domain for this namespace
} catch (Exception e) {
log.error("[{}] Failed to get partitioned topic list for namespace {}", clientAppId(),
Expand Down Expand Up @@ -825,49 +819,37 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n

try {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
zkCreateOptimisticAsync(globalZk(), path, data, (rc, s, o, s1) -> {
if (KeeperException.Code.OK.intValue() == rc) {
globalZk().sync(path, (rc2, s2, ctx) -> {
if (KeeperException.Code.OK.intValue() == rc2) {
log.info("[{}] Successfully created partitioned topic {}",
namespaceResources().getPartitionedTopicResouces()
.createAsync(path, new PartitionedTopicMetadata(numPartitions)).thenAccept(r -> {
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
log.info("[{}] Successfully created partitions for topic {}", clientAppId(),
topicName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
log.error("[{}] Failed to create partitions for topic {}", clientAppId(),
topicName);
// The partitioned topic is created but there are some partitions create failed
asyncResponse.resume(new RestException(e));
return null;
});
}).exceptionally(ex -> {
if (ex.getCause() instanceof AlreadyExistsException) {
log.warn("[{}] Failed to create already existing partitioned topic {}",
clientAppId(), topicName);
tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
log.info("[{}] Successfully created partitions for topic {}",
clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
log.error("[{}] Failed to create partitions for topic {}",
clientAppId(), topicName);
// The partitioned topic is created but there are some partitions create failed
asyncResponse.resume(new RestException(e));
return null;
});
} else {
log.error("[{}] Failed to create partitioned topic {}",
clientAppId(), topicName,
KeeperException.create(KeeperException.Code.get(rc2)));
asyncResponse.resume(
new RestException(KeeperException.create(KeeperException.Code.get(rc2))));
new RestException(Status.CONFLICT, "Partitioned topic already exists"));
} else if (ex.getCause() instanceof BadVersionException) {
log.warn("[{}] Failed to create partitioned topic {}: concurrent modification",
clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
} else {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName,
ex.getCause());
asyncResponse.resume(new RestException(ex.getCause()));
}
}, null);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
log.warn("[{}] Failed to create already existing partitioned topic {}",
clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.CONFLICT,
"Partitioned topic already exists"));
} else if (KeeperException.Code.BADVERSION.intValue() == rc) {
log.warn("[{}] Failed to create partitioned topic {}: concurrent modification",
clientAppId(),
topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
} else {
log.error("[{}] Failed to create partitioned topic {}",
clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc)));
asyncResponse.resume(
new RestException(KeeperException.create(KeeperException.Code.get(rc))));
}
});
return null;
});
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import java.util.Optional;
import lombok.Getter;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.Policies;
Expand All @@ -33,11 +34,17 @@
public class NamespaceResources extends BaseResources<Policies> {
private IsolationPolicyResources isolationPolicies;
private LocalPoliciesResources localPolicies;
private PartitionedTopicResources partitionedTopicResouces;
private MetadataStoreExtended localStore;
private MetadataStoreExtended configurationStore;

public NamespaceResources(MetadataStoreExtended localStore, MetadataStoreExtended configurationStore) {
super(configurationStore, Policies.class);
this.localStore = localStore;
this.configurationStore = configurationStore;
isolationPolicies = new IsolationPolicyResources(configurationStore);
localPolicies = new LocalPoliciesResources(localStore);
partitionedTopicResouces = new PartitionedTopicResources(configurationStore);
}

public static class IsolationPolicyResources extends BaseResources<Map<String, NamespaceIsolationData>> {
Expand All @@ -57,4 +64,10 @@ public LocalPoliciesResources(MetadataStoreExtended configurationStore) {
super(configurationStore, LocalPolicies.class);
}
}

public static class PartitionedTopicResources extends BaseResources<PartitionedTopicMetadata> {
public PartitionedTopicResources(MetadataStoreExtended configurationStore) {
super(configurationStore, PartitionedTopicMetadata.class);
}
}
}
Loading

0 comments on commit be4b114

Please sign in to comment.