From 5d369762fb8776401039405ebba9057fe2c66b0e Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Mon, 15 Nov 2021 08:52:31 +0800 Subject: [PATCH] [Issue 12723] Fix race condition in PersistentTopic#addReplicationCluster (#12729) See #12723 Add a method org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap#removeNullValue to remove null value in a thread safe way. (cherry picked from commit a3fe00efc4ccba55a0f28fd02b535c6624e3ed0a) --- .../service/persistent/PersistentTopic.java | 6 ++-- .../collections/ConcurrentOpenHashMap.java | 26 +++++++++++++++ .../ConcurrentOpenHashMapTest.java | 32 +++++++++++++++++++ 3 files changed, 61 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c2320717f3549..275c7ea8989ee 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1552,7 +1552,7 @@ protected CompletableFuture addReplicationCluster(String remoteCluster, Ma .thenApply(clusterData -> brokerService.getReplicationClient(remoteCluster, clusterData))) .thenAccept(replicationClient -> { - replicators.computeIfAbsent(remoteCluster, r -> { + Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> { try { return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster, brokerService, (PulsarClientImpl) replicationClient); @@ -1563,8 +1563,8 @@ protected CompletableFuture addReplicationCluster(String remoteCluster, Ma }); // clean up replicator if startup is failed - if (replicators.containsKey(remoteCluster) && replicators.get(remoteCluster) == null) { - replicators.remove(remoteCluster); + if (replicator == null) { + replicators.removeNullValue(remoteCluster); } }); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java index 47927a98eeaf3..2c7eed1b58e03 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -42,6 +43,27 @@ public class ConcurrentOpenHashMap { private static final Object EmptyKey = null; private static final Object DeletedKey = new Object(); + /** + * This object is used to delete empty value in this map. + * EmptyValue.equals(null) = true. + */ + private static final Object EmptyValue = new Object() { + + @SuppressFBWarnings + @Override + public boolean equals(Object obj) { + return obj == null; + } + + /** + * This is just for avoiding spotbugs errors + */ + @Override + public int hashCode() { + return super.hashCode(); + } + }; + private static final float MapFillFactor = 0.66f; private static final int DefaultExpectedItems = 256; @@ -142,6 +164,10 @@ public boolean remove(K key, Object value) { return getSection(h).remove(key, value, (int) h) != null; } + public void removeNullValue(K key) { + remove(key, EmptyValue); + } + private Section getSection(long hash) { // Use 32 msb out of long to get the section final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java index e18012cdf13f2..254be51f29239 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java @@ -22,6 +22,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -369,6 +370,37 @@ public boolean equals(Object obj) { assertNull(map.get(t1_b)); } + @Test + public void testNullValue() { + ConcurrentOpenHashMap map = new ConcurrentOpenHashMap<>(16, 1); + String key = "a"; + assertThrows(NullPointerException.class, () -> map.put(key, null)); + + //put a null value. + assertNull(map.computeIfAbsent(key, k -> null)); + assertEquals(1, map.size()); + assertEquals(1, map.keys().size()); + assertEquals(1, map.values().size()); + assertNull(map.get(key)); + assertFalse(map.containsKey(key)); + + //test remove null value + map.removeNullValue(key); + assertTrue(map.isEmpty()); + assertEquals(0, map.keys().size()); + assertEquals(0, map.values().size()); + assertNull(map.get(key)); + assertFalse(map.containsKey(key)); + + + //test not remove non-null value + map.put(key, "V"); + assertEquals(1, map.size()); + map.removeNullValue(key); + assertEquals(1, map.size()); + + } + static final int Iterations = 1; static final int ReadIterations = 1000; static final int N = 1_000_000;