Skip to content

Commit

Permalink
[Issue 12723] Fix race condition in PersistentTopic#addReplicationClu…
Browse files Browse the repository at this point in the history
…ster (apache#12729)

### Motivation

See apache#12723

### Modifications

Add a method org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap#removeNullValue to remove null value   in a thread safe way.
  • Loading branch information
Jason918 authored Nov 15, 2021
1 parent 22f2270 commit a3fe00e
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1559,7 +1559,7 @@ protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, Ma
.thenApply(clusterData ->
brokerService.getReplicationClient(remoteCluster, clusterData)))
.thenAccept(replicationClient -> {
replicators.computeIfAbsent(remoteCluster, r -> {
Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new PersistentReplicator(PersistentTopic.this, cursor, localCluster,
remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
Expand All @@ -1570,8 +1570,8 @@ protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, Ma
});

// clean up replicator if startup is failed
if (replicators.containsKey(remoteCluster) && replicators.get(remoteCluster) == null) {
replicators.remove(remoteCluster);
if (replicator == null) {
replicators.removeNullValue(remoteCluster);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -43,6 +43,27 @@ public class ConcurrentOpenHashMap<K, V> {
private static final Object EmptyKey = null;
private static final Object DeletedKey = new Object();

/**
* This object is used to delete empty value in this map.
* EmptyValue.equals(null) = true.
*/
private static final Object EmptyValue = new Object() {

@SuppressFBWarnings
@Override
public boolean equals(Object obj) {
return obj == null;
}

/**
* This is just for avoiding spotbugs errors
*/
@Override
public int hashCode() {
return super.hashCode();
}
};

private static final float MapFillFactor = 0.66f;

private static final int DefaultExpectedItems = 256;
Expand Down Expand Up @@ -143,6 +164,10 @@ public boolean remove(K key, Object value) {
return getSection(h).remove(key, value, (int) h) != null;
}

public void removeNullValue(K key) {
remove(key, EmptyValue);
}

private Section<K, V> getSection(long hash) {
// Use 32 msb out of long to get the section
final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

Expand Down Expand Up @@ -369,6 +370,37 @@ public boolean equals(Object obj) {
assertNull(map.get(t1_b));
}

@Test
public void testNullValue() {
ConcurrentOpenHashMap<String, String> map = new ConcurrentOpenHashMap<>(16, 1);
String key = "a";
assertThrows(NullPointerException.class, () -> map.put(key, null));

//put a null value.
assertNull(map.computeIfAbsent(key, k -> null));
assertEquals(1, map.size());
assertEquals(1, map.keys().size());
assertEquals(1, map.values().size());
assertNull(map.get(key));
assertFalse(map.containsKey(key));

//test remove null value
map.removeNullValue(key);
assertTrue(map.isEmpty());
assertEquals(0, map.keys().size());
assertEquals(0, map.values().size());
assertNull(map.get(key));
assertFalse(map.containsKey(key));


//test not remove non-null value
map.put(key, "V");
assertEquals(1, map.size());
map.removeNullValue(key);
assertEquals(1, map.size());

}

static final int Iterations = 1;
static final int ReadIterations = 1000;
static final int N = 1_000_000;
Expand Down

0 comments on commit a3fe00e

Please sign in to comment.