Skip to content

Commit

Permalink
[improve][broker] Remove locallyAcquiredLock when removeOwnership (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
leizhiyuan authored Oct 27, 2022
1 parent c7990b9 commit b061c6a
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -207,7 +208,7 @@ public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(Namespace
*
*/
public CompletableFuture<Void> removeOwnership(NamespaceBundle bundle) {
ResourceLock<NamespaceEphemeralData> lock = locallyAcquiredLocks.get(bundle);
ResourceLock<NamespaceEphemeralData> lock = locallyAcquiredLocks.remove(bundle);
if (lock == null) {
// We don't own the specified bundle anymore
return CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -328,6 +329,12 @@ public void invalidateLocalOwnerCache(NamespaceBundle namespaceBundle) {
this.ownedBundlesCache.synchronous().invalidate(namespaceBundle);
}

@VisibleForTesting
public Map<NamespaceBundle, ResourceLock<NamespaceEphemeralData>> getLocallyAcquiredLocks() {
return locallyAcquiredLocks;
}


public synchronized boolean refreshSelfOwnerInfo() {
this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(),
pulsar.getBrokerServiceUrlTls(), pulsar.getSafeWebServiceAddress(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ public void testRemoveOwnership() throws Exception {
Awaitility.await().untilAsserted(() -> {
assertTrue(cache.getOwnedBundles().isEmpty());
assertFalse(store.exists(ServiceUnitUtils.path(bundle)).join());
assertNull(cache.getLocallyAcquiredLocks().get(bundle));
});
}

Expand Down

0 comments on commit b061c6a

Please sign in to comment.