Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix create partitioned topic with a substring of an existing topic name. #6478

Merged
merged 7 commits into from
Mar 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.net.MalformedURLException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -36,6 +37,7 @@

import javax.servlet.ServletContext;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
Expand All @@ -46,6 +48,7 @@
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
Expand Down Expand Up @@ -255,35 +258,42 @@ protected List<String> getListOfNamespaces(String property) throws Exception {
return namespaces;
}

protected void tryCreatePartitionsAsync(int numPartitions) {
protected CompletableFuture<Void> tryCreatePartitionsAsync(int numPartitions) {
if (!topicName.isPersistent()) {
return;
return CompletableFuture.completedFuture(null);
}
List<CompletableFuture<Void>> futures = new ArrayList<>(numPartitions);
for (int i = 0; i < numPartitions; i++) {
tryCreatePartitionAsync(i);
futures.add(tryCreatePartitionAsync(i, null));
}
return FutureUtil.waitForAll(futures);
}

private void tryCreatePartitionAsync(final int partition) {
private CompletableFuture<Void> tryCreatePartitionAsync(final int partition, CompletableFuture<Void> reuseFuture) {
CompletableFuture<Void> result = reuseFuture == null ? new CompletableFuture<>() : reuseFuture;
zkCreateOptimisticAsync(localZk(), ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0],
(rc, s, o, s1) -> {
if (KeeperException.Code.OK.intValue() == rc) {
if (log.isDebugEnabled()) {
log.debug("[{}] Topic partition {} created.", clientAppId(),
topicName.getPartition(partition));
}
result.complete(null);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
log.info("[{}] Topic partition {} is exists, doing nothing.", clientAppId(),
topicName.getPartition(partition));
result.completeExceptionally(KeeperException.create(KeeperException.Code.NODEEXISTS));
} else if (KeeperException.Code.BADVERSION.intValue() == rc) {
log.warn("[{}] Fail to create topic partition {} with concurrent modification, retry now.",
clientAppId(), topicName.getPartition(partition));
tryCreatePartitionAsync(partition);
tryCreatePartitionAsync(partition, result);
} else {
log.error("[{}] Fail to create topic partition {}", clientAppId(),
topicName.getPartition(partition), KeeperException.create(KeeperException.Code.get(rc)));
result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
}
});
return result;
}

protected NamespaceName namespaceName;
Expand Down Expand Up @@ -707,4 +717,98 @@ protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
partitionedTopics.sort(null);
return partitionedTopics;
}

protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions) {
try {
validateAdminAccessForTenant(topicName.getTenant());
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
if (numPartitions <= 0) {
asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0"));
return;
}
checkTopicExistsAsync(topicName).thenAccept(exists -> {
if (exists) {
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists"));
} else {
try {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
zkCreateOptimisticAsync(globalZk(), path, data, (rc, s, o, s1) -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui why it is globalZk() here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The partitioned topic metadata is stored in the global zookeeper.

if (KeeperException.Code.OK.intValue() == rc) {
globalZk().sync(path, (rc2, s2, ctx) -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should happen in the callback of tryCreatePartitionAsync.

if (KeeperException.Code.OK.intValue() == rc2) {
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
log.info("[{}] Successfully created partitions for topic {}", clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
// The partitioned topic is created but there are some partitions create failed
asyncResponse.resume(new RestException(e));
return null;
});
} else {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc2)));
asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc2))));
}
}, null);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "Partitioned topic already exists"));
} else if (KeeperException.Code.BADVERSION.intValue() == rc) {
log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(),
topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
} else {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc)));
asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc))));
}
});
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}
}).exceptionally(ex -> {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

/**
* Check the exists topics contains the given topic.
* Since there are topic partitions and non-partitioned topics in Pulsar, must ensure both partitions
* and non-partitioned topics are not duplicated. So, if compare with a partition name, we should compare
* to the partitioned name of this partition.
*
* @param topicName given topic name
*/
protected CompletableFuture<Boolean> checkTopicExistsAsync(TopicName topicName) {
return pulsar().getNamespaceService().getListOfTopics(topicName.getNamespaceObject(),
PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
.thenCompose(topics -> {
boolean exists = false;
for (String topic : topics) {
if (topicName.getPartitionedTopicName().equals(TopicName.get(topic).getPartitionedTopicName())) {
exists = true;
break;
}
}
return CompletableFuture.completedFuture(exists);
});
}

protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) {
if (throwable instanceof WebApplicationException) {
asyncResponse.resume(throwable);
} else {
asyncResponse.resume(new RestException(throwable));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import org.apache.pulsar.common.api.proto.PulsarApi;

import static org.apache.pulsar.common.util.Codec.decode;

import com.github.zafarkhaja.semver.Version;
Expand Down Expand Up @@ -390,46 +390,6 @@ protected void internalRevokePermissionsOnTopic(String role) {
revokePermissions(topicName.toString(), role);
}

protected void internalCreatePartitionedTopic(int numPartitions) {
validateAdminAccessForTenant(topicName.getTenant());
if (numPartitions <= 0) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
}
validatePartitionTopicName(topicName.getLocalName());
try {
boolean topicExist = pulsar().getNamespaceService()
.getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
.join()
.contains(topicName.toString());
if (topicExist) {
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
throw new RestException(Status.CONFLICT, "This topic already exists");
}
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
}
try {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
zkCreateOptimistic(path, data);
tryCreatePartitionsAsync(numPartitions);
// Sync data to all quorums and the observers
zkSync(path);
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
} catch (KeeperException.NodeExistsException e) {
log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
throw new RestException(Status.CONFLICT, "Partitioned topic already exists");
} catch (KeeperException.BadVersionException e) {
log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(),
topicName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
}
}

protected void internalCreateNonPartitionedTopic(boolean authoritative) {
validateAdminAccessForTenant(topicName.getTenant());
validateNonPartitionTopicName(topicName.getLocalName());
Expand Down Expand Up @@ -540,11 +500,22 @@ protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateL
}
}

protected void internalCreateMissedPartitions() {
PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName, false, false);
if (metadata != null) {
tryCreatePartitionsAsync(metadata.partitions);
}
protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
if (metadata != null) {
tryCreatePartitionsAsync(metadata.partitions).thenAccept(v -> {
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
resumeAsyncResponseExceptionally(asyncResponse, e);
return null;
});
}
}).exceptionally(e -> {
log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
resumeAsyncResponseExceptionally(asyncResponse, e);
return null;
});
}

private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) {
Expand Down Expand Up @@ -2071,40 +2042,6 @@ private void validatePartitionTopicUpdate(String topicName, int numberOfPartitio
}
}

/**
* Validate partitioned topic name.
* Validation will fail and throw RestException if
* 1) There's already a partitioned topic with same topic name and have some of its partition created.
* 2) There's already non partition topic with same name and contains partition suffix "-partition-"
* followed by numeric value. In this case internal created partition of partitioned topic could override
* the existing non partition topic.
*
* @param topicName
*/
private void validatePartitionTopicName(String topicName) {
List<String> existingTopicList = internalGetList();
String prefix = topicName + TopicName.PARTITIONED_TOPIC_SUFFIX;
for (String existingTopicName : existingTopicList) {
if (existingTopicName.contains(prefix)) {
try {
Long.parseLong(existingTopicName.substring(
existingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX)
+ TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
log.warn("[{}] Already have topic {} which contains partition " +
"suffix '-partition-' and end with numeric value. Creation of partitioned topic {}"
+ "could cause conflict.", clientAppId(), existingTopicName, topicName);
throw new RestException(Status.PRECONDITION_FAILED,
"Already have topic " + existingTopicName + " which contains partition suffix '-partition-' " +
"and end with numeric value, Creation of partitioned topic " + topicName +
" could cause conflict.");
} catch (NumberFormatException e) {
// Do nothing, if value after partition suffix is not pure numeric value,
// as it can't conflict with internal created partitioned topic's name.
}
}
}
}

/**
* Validate non partition topic name,
* Validation will fail and throw RestException if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,41 +124,15 @@ public PersistentTopicInternalStats getInternalStats(@PathParam("property") Stri
@ApiOperation(hidden = true, value = "Create a partitioned topic.", notes = "It needs to be called before creating a producer on a partitioned topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 409, message = "Partitioned topic already exist") })
public void createPartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
int numPartitions) {
validateTopicName(property, cluster, namespace, encodedTopic);
validateAdminAccessForTenant(topicName.getTenant());
if (numPartitions <= 0) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
}
try {
boolean topicExist = pulsar().getNamespaceService()
.getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
.join()
.contains(topicName.toString());
if (topicExist) {
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
throw new RestException(Status.CONFLICT, "This topic already exists");
}
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
}
try {
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
topicName.getEncodedLocalName());
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
zkCreateOptimistic(path, data);
// Sync data to all quorums and the observers
zkSync(path);
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
} catch (KeeperException.NodeExistsException e) {
log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
throw new RestException(Status.CONFLICT, "Partitioned topic already exist");
validateTopicName(property, cluster, namespace, encodedTopic);
internalCreatePartitionedTopic(asyncResponse, numPartitions);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}

Expand Down
Loading