Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker]Fix deadlock of metadata store #20189

Merged
merged 10 commits into from
May 18, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -1137,16 +1139,17 @@ public CompletableFuture<Boolean> isServiceUnitOwnedAsync(ServiceUnitId suName)
new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName()));
}

/**
* @Deprecated This method is only used in test now.
*/
@Deprecated
public boolean isServiceUnitActive(TopicName topicName) {
try {
Copy link
Member

@lhotari lhotari May 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to use something like isServiceUnitActiveAsync(topicName).get(conf.getMetadataStoreOperationTimeoutSeconds(), SECONDS);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari

I think it is a good suggestion. Already edit this method to make the test that calls the method works better.

Could you take a look again?

OwnedBundle ownedBundle = ownershipCache.getOwnedBundle(getBundle(topicName));
if (ownedBundle == null) {
return false;
}
return ownedBundle.isActive();
} catch (Exception e) {
LOG.warn("Unable to find OwnedBundle for topic - [{}]", topicName, e);
return false;
return isServiceUnitActiveAsync(topicName).get(pulsar.getConfig()
.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.warn("Unable to find OwnedBundle for topic in time - [{}]", topicName, e);
throw new RuntimeException(e);
}
}

Expand All @@ -1156,12 +1159,13 @@ public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName topicName)
return getBundleAsync(topicName)
.thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
}
Optional<CompletableFuture<OwnedBundle>> res = ownershipCache.getOwnedBundleAsync(getBundle(topicName));
if (!res.isPresent()) {
return CompletableFuture.completedFuture(false);
}

return res.get().thenApply(ob -> ob != null && ob.isActive());
return getBundleAsync(topicName).thenCompose(bundle -> {
Optional<CompletableFuture<OwnedBundle>> optionalFuture = ownershipCache.getOwnedBundleAsync(bundle);
if (!optionalFuture.isPresent()) {
return CompletableFuture.completedFuture(false);
}
return optionalFuture.get().thenApply(ob -> ob != null && ob.isActive());
});
}

private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,10 @@ public Optional<CompletableFuture<OwnedBundle>> getOwnedBundleAsync(NamespaceBun

/**
* Disable bundle in local cache and on zk.
*
* @param bundle
* @throws Exception
* @Deprecated This is a dangerous method which is currently only used for test, it will occupy the ZK thread.
* Please switch to your own thread after calling this method.
*/
@Deprecated
public CompletableFuture<Void> disableOwnership(NamespaceBundle bundle) {
return updateBundleState(bundle, false)
.thenCompose(__ -> {
Expand Down