Skip to content

Commit

Permalink
[improve][broker]Ensure namespace deletion doesn't fail (#22627)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored May 13, 2024
1 parent 16556fa commit 936afec
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,22 +197,21 @@ protected CompletableFuture<Void> deleteAsync(String path) {
}

protected CompletableFuture<Void> deleteIfExistsAsync(String path) {
return cache.exists(path).thenCompose(exists -> {
if (!exists) {
return CompletableFuture.completedFuture(null);
log.info("Deleting path: {}", path);
CompletableFuture<Void> future = new CompletableFuture<>();
cache.delete(path).whenComplete((ignore, ex) -> {
if (ex != null && ex.getCause() instanceof MetadataStoreException.NotFoundException) {
log.info("Path {} did not exist in metadata store", path);
future.complete(null);
} else if (ex != null) {
log.info("Failed to delete path from metadata store: {}", path, ex);
future.completeExceptionally(ex);
} else {
log.info("Deleted path from metadata store: {}", path);
future.complete(null);
}
CompletableFuture<Void> future = new CompletableFuture<>();
cache.delete(path).whenComplete((ignore, ex) -> {
if (ex != null && ex.getCause() instanceof MetadataStoreException.NotFoundException) {
future.complete(null);
} else if (ex != null) {
future.completeExceptionally(ex);
} else {
future.complete(null);
}
});
return future;
});
return future;
}

protected boolean exists(String path) throws MetadataStoreException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void deleteLocalPolicies(NamespaceName ns) throws MetadataStoreException
}

public CompletableFuture<Void> deleteLocalPoliciesAsync(NamespaceName ns) {
return deleteAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString()));
return deleteIfExistsAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString()));
}

public CompletableFuture<Void> deleteLocalPoliciesTenantAsync(String tenant) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void deletePolicies(NamespaceName ns) throws MetadataStoreException{
}

public CompletableFuture<Void> deletePoliciesAsync(NamespaceName ns){
return deleteAsync(joinPath(BASE_POLICIES_PATH, ns.toString()));
return deleteIfExistsAsync(joinPath(BASE_POLICIES_PATH, ns.toString()));
}

public Optional<Policies> getPolicies(NamespaceName ns) throws MetadataStoreException{
Expand Down Expand Up @@ -155,10 +155,18 @@ public static boolean pathIsNamespaceLocalPolicies(String path) {
&& path.substring(LOCAL_POLICIES_ROOT.length() + 1).contains("/");
}

// clear resource of `/namespace/{namespaceName}` for zk-node
/**
* Clear resource of `/namespace/{namespaceName}` for zk-node.
* @param ns the namespace name
* @return a handle to the results of the operation
* */
//
public CompletableFuture<Void> deleteNamespaceAsync(NamespaceName ns) {
final String namespacePath = joinPath(NAMESPACE_BASE_PATH, ns.toString());
return deleteIfExistsAsync(namespacePath);
// please beware that this will delete all the children of the namespace
// including the ownership nodes (ephemeral nodes)
// see ServiceUnitUtils.path(ns) for the ownership node path
return getStore().deleteRecursive(namespacePath);
}

// clear resource of `/namespace/{tenant}` for zk-node
Expand Down Expand Up @@ -303,11 +311,14 @@ public CompletableFuture<Void> deletePartitionedTopicAsync(TopicName tn) {

public CompletableFuture<Void> clearPartitionedTopicMetadataAsync(NamespaceName namespaceName) {
final String globalPartitionedPath = joinPath(PARTITIONED_TOPIC_PATH, namespaceName.toString());
log.info("Clearing partitioned topic metadata for namespace {}, path is {}",
namespaceName, globalPartitionedPath);
return getStore().deleteRecursive(globalPartitionedPath);
}

public CompletableFuture<Void> clearPartitionedTopicTenantAsync(String tenant) {
final String partitionedTopicPath = joinPath(PARTITIONED_TOPIC_PATH, tenant);
log.info("Clearing partitioned topic metadata for tenant {}, path is {}", tenant, partitionedTopicPath);
return deleteIfExistsAsync(partitionedTopicPath);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@ public CompletableFuture<List<String>> getExistingPartitions(NamespaceName ns, T
);
}

public CompletableFuture<Void> deletePersistentTopicAsync(TopicName topic) {
String path = MANAGED_LEDGER_PATH + "/" + topic.getPersistenceNamingEncoding();
return store.delete(path, Optional.of(-1L));
}

public CompletableFuture<Void> createPersistentTopicAsync(TopicName topic) {
String path = MANAGED_LEDGER_PATH + "/" + topic.getPersistenceNamingEncoding();
return store.put(path, new byte[0], Optional.of(-1L))
Expand All @@ -93,38 +88,20 @@ public CompletableFuture<Boolean> persistentTopicExists(TopicName topic) {

public CompletableFuture<Void> clearNamespacePersistence(NamespaceName ns) {
String path = MANAGED_LEDGER_PATH + "/" + ns;
return store.exists(path)
.thenCompose(exists -> {
if (exists) {
return store.delete(path, Optional.empty());
} else {
return CompletableFuture.completedFuture(null);
}
});
log.info("Clearing namespace persistence for namespace: {}, path {}", ns, path);
return store.deleteIfExists(path, Optional.empty());
}

public CompletableFuture<Void> clearDomainPersistence(NamespaceName ns) {
String path = MANAGED_LEDGER_PATH + "/" + ns + "/persistent";
return store.exists(path)
.thenCompose(exists -> {
if (exists) {
return store.delete(path, Optional.empty());
} else {
return CompletableFuture.completedFuture(null);
}
});
log.info("Clearing domain persistence for namespace: {}, path {}", ns, path);
return store.deleteIfExists(path, Optional.empty());
}

public CompletableFuture<Void> clearTenantPersistence(String tenant) {
String path = MANAGED_LEDGER_PATH + "/" + tenant;
return store.exists(path)
.thenCompose(exists -> {
if (exists) {
return store.deleteRecursive(path);
} else {
return CompletableFuture.completedFuture(null);
}
});
log.info("Clearing tenant persistence for tenant: {}, path {}", tenant, path);
return store.deleteRecursive(path);
}

void handleNotification(Notification notification) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,14 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime
clientAppId(), ex);
return FutureUtil.failedFuture(ex);
}
log.info("[{}] Deleting namespace bundle {}/{}", clientAppId(),
namespaceName, bundle.getBundleRange());
return admin.namespaces().deleteNamespaceBundleAsync(namespaceName.toString(),
bundle.getBundleRange(), force);
} else {
log.warn("[{}] Skipping deleting namespace bundle {}/{} "
+ "as it's not owned by any broker",
clientAppId(), namespaceName, bundle.getBundleRange());
}
return CompletableFuture.completedFuture(null);
})
Expand All @@ -321,16 +327,20 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime
final Throwable rc = FutureUtil.unwrapCompletionException(error);
if (rc instanceof MetadataStoreException) {
if (rc.getCause() != null && rc.getCause() instanceof KeeperException.NotEmptyException) {
KeeperException.NotEmptyException ne =
(KeeperException.NotEmptyException) rc.getCause();
log.info("[{}] There are in-flight topics created during the namespace deletion, "
+ "retry to delete the namespace again.", namespaceName);
+ "retry to delete the namespace again. (path {} is not empty on metadata)",
namespaceName, ne.getPath());
final int next = retryTimes - 1;
if (next > 0) {
// async recursive
internalRetryableDeleteNamespaceAsync0(force, next, callback);
} else {
callback.completeExceptionally(
new RestException(Status.CONFLICT, "The broker still have in-flight topics"
+ " created during namespace deletion, please try again."));
+ " created during namespace deletion (path " + ne.getPath() + ") "
+ "is not empty on metadata store, please try again."));
// drop out recursive
}
return;
Expand Down Expand Up @@ -476,6 +486,8 @@ protected CompletableFuture<Void> internalClearZkSources() {
@SuppressWarnings("deprecation")
protected CompletableFuture<Void> internalDeleteNamespaceBundleAsync(String bundleRange, boolean authoritative,
boolean force) {
log.info("[{}] Deleting namespace bundle {}/{} authoritative:{} force:{}",
clientAppId(), namespaceName, bundleRange, authoritative, force);
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.DELETE_BUNDLE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,8 @@ private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> reader)
} else {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof PulsarClientException.AlreadyClosedException) {
log.warn("Read more topic policies exception, close the read now!", ex);
log.info("Closing the topic policies reader for {}",
reader.getSystemTopic().getTopicName());
cleanCacheAndCloseReader(
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Metadata store client interface.
Expand All @@ -36,6 +39,8 @@
@Beta
public interface MetadataStore extends AutoCloseable {

Logger LOGGER = LoggerFactory.getLogger(MetadataStore.class);

/**
* Read the value of one key, identified by the path
*
Expand Down Expand Up @@ -121,6 +126,23 @@ default CompletableFuture<Void> sync(String path) {
*/
CompletableFuture<Void> delete(String path, Optional<Long> expectedVersion);

default CompletableFuture<Void> deleteIfExists(String path, Optional<Long> expectedVersion) {
return delete(path, expectedVersion)
.exceptionally(e -> {
if (e.getCause() instanceof NotFoundException) {
LOGGER.info("Path {} not found while deleting (this is not a problem)", path);
return null;
} else {
if (expectedVersion.isEmpty()) {
LOGGER.info("Failed to delete path {}", path, e);
} else {
LOGGER.info("Failed to delete path {} with expected version {}", path, expectedVersion, e);
}
throw new CompletionException(e);
}
});
}

/**
* Delete a key-value pair and all the children nodes.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ public void accept(Notification n) {

@Override
public final CompletableFuture<Void> delete(String path, Optional<Long> expectedVersion) {
log.info("Deleting path: {} (v. {})", path, expectedVersion);
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
Expand Down Expand Up @@ -405,11 +406,13 @@ private CompletableFuture<Void> deleteInternal(String path, Optional<Long> expec
}

metadataCaches.forEach(c -> c.invalidate(path));
log.info("Deleted path: {} (v. {})", path, expectedVersion);
});
}

@Override
public CompletableFuture<Void> deleteRecursive(String path) {
log.info("Deleting recursively path: {}", path);
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
Expand All @@ -419,13 +422,9 @@ public CompletableFuture<Void> deleteRecursive(String path) {
children.stream()
.map(child -> deleteRecursive(path + "/" + child))
.collect(Collectors.toList())))
.thenCompose(__ -> exists(path))
.thenCompose(exists -> {
if (exists) {
return delete(path, Optional.empty());
} else {
return CompletableFuture.completedFuture(null);
}
.thenCompose(__ -> {
log.info("After deleting all children, now deleting path: {}", path);
return deleteIfExists(path, Optional.empty());
});
}

Expand Down

0 comments on commit 936afec

Please sign in to comment.