Skip to content

Commit

Permalink
Ensure read-lock is not continuously held on a section while iteratin…
Browse files Browse the repository at this point in the history
…g over concurrent maps (#9787)

* Ensure read-lock is not continuously held on a section while iterating over concurrent maps

* Added try/finally
  • Loading branch information
merlimat committed May 22, 2021
1 parent a19db1d commit 4c369c9
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,14 @@ private Section<V> getSection(long hash) {
}

public void clear() {
for (Section<V> s : sections) {
s.clear();
for (int i = 0; i < sections.length; i++) {
sections[i].clear();
}
}

public void forEach(EntryProcessor<V> processor) {
for (Section<V> s : sections) {
s.forEach(processor);
for (int i = 0; i < sections.length; i++) {
sections[i].forEach(processor);
}
}

Expand Down Expand Up @@ -394,46 +394,48 @@ void clear() {
public void forEach(EntryProcessor<V> processor) {
long stamp = tryOptimisticRead();

// We need to make sure that we read these 3 variables in a consistent way
int capacity = this.capacity;
long[] keys = this.keys;
V[] values = this.values;

boolean acquiredReadLock = false;
// Validate no rehashing
if (!validate(stamp)) {
// Fallback to read lock
stamp = readLock();

try {

// Validate no rehashing
if (!validate(stamp)) {
// Fallback to read lock
stamp = readLock();
acquiredReadLock = true;
capacity = this.capacity;
keys = this.keys;
values = this.values;
unlockRead(stamp);
}

capacity = this.capacity;
keys = this.keys;
values = this.values;
// Go through all the buckets for this section. We try to renew the stamp only after a validation
// error, otherwise we keep going with the same.
for (int bucket = 0; bucket < capacity; bucket++) {
if (stamp == 0) {
stamp = tryOptimisticRead();
}

// Go through all the buckets for this section
for (int bucket = 0; bucket < capacity; bucket++) {
long storedKey = keys[bucket];
V storedValue = values[bucket];
long storedKey = keys[bucket];
V storedValue = values[bucket];

if (!acquiredReadLock && !validate(stamp)) {
// Fallback to acquiring read lock
stamp = readLock();
acquiredReadLock = true;
if (!validate(stamp)) {
// Fallback to acquiring read lock
stamp = readLock();

try {
storedKey = keys[bucket];
storedValue = values[bucket];
} finally {
unlockRead(stamp);
}

if (storedValue != DeletedValue && storedValue != EmptyValue) {
processor.accept(storedKey, storedValue);
}
stamp = 0;
}
} finally {
if (acquiredReadLock) {
unlockRead(stamp);

if (storedValue != DeletedValue && storedValue != EmptyValue) {
processor.accept(storedKey, storedValue);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,23 +87,23 @@ public ConcurrentLongPairSet(int expectedItems, int concurrencyLevel) {

public long size() {
long size = 0;
for (Section s : sections) {
size += s.size;
for (int i = 0; i < sections.length; i++) {
size += sections[i].size;
}
return size;
}

public long capacity() {
long capacity = 0;
for (Section s : sections) {
capacity += s.capacity;
for (int i = 0; i < sections.length; i++) {
capacity += sections[i].capacity;
}
return capacity;
}

public boolean isEmpty() {
for (Section s : sections) {
if (s.size != 0) {
for (int i = 0; i < sections.length; i++) {
if (sections[i].size != 0) {
return false;
}
}
Expand All @@ -112,8 +112,8 @@ public boolean isEmpty() {

long getUsedBucketCount() {
long usedBucketCount = 0;
for (Section s : sections) {
usedBucketCount += s.usedBuckets;
for (int i = 0; i < sections.length; i++) {
usedBucketCount += sections[i].usedBuckets;
}
return usedBucketCount;
}
Expand Down Expand Up @@ -155,8 +155,8 @@ public void clear() {
}

public void forEach(LongPairConsumer processor) {
for (Section s : sections) {
s.forEach(processor);
for (int i = 0; i < sections.length; i++) {
sections[i].forEach(processor);
}
}

Expand All @@ -170,8 +170,8 @@ public void forEach(LongPairConsumer processor) {
*/
public int removeIf(LongPairPredicate filter) {
int removedValues = 0;
for (Section s : sections) {
removedValues += s.removeIf(filter);
for (int i = 0; i < sections.length; i++) {
removedValues += sections[i].removeIf(filter);
}
return removedValues;
}
Expand All @@ -195,8 +195,8 @@ public Set<LongPair> items(int numberOfItems) {
@Override
public <T> Set<T> items(int numberOfItems, LongPairFunction<T> longPairConverter) {
Set<T> items = new HashSet<>();
for (Section s : sections) {
s.forEach((item1, item2) -> {
for (int i = 0; i < sections.length; i++) {
sections[i].forEach((item1, item2) -> {
if (items.size() < numberOfItems) {
items.add(longPairConverter.apply(item1, item2));
}
Expand Down Expand Up @@ -399,42 +399,35 @@ void clear() {
}

public void forEach(LongPairConsumer processor) {
long stamp = tryOptimisticRead();

long[] table = this.table;
boolean acquiredReadLock = false;

try {

// Validate no rehashing
if (!validate(stamp)) {
// Fallback to read lock
stamp = readLock();
acquiredReadLock = true;
table = this.table;
// Go through all the buckets for this section. We try to renew the stamp only after a validation
// error, otherwise we keep going with the same.
long stamp = 0;
for (int bucket = 0; bucket < table.length; bucket += 2) {
if (stamp == 0) {
stamp = tryOptimisticRead();
}

// Go through all the buckets for this section
for (int bucket = 0; bucket < table.length; bucket += 2) {
long storedItem1 = table[bucket];
long storedItem2 = table[bucket + 1];
long storedItem1 = table[bucket];
long storedItem2 = table[bucket + 1];

if (!acquiredReadLock && !validate(stamp)) {
// Fallback to acquiring read lock
stamp = readLock();
acquiredReadLock = true;
if (!validate(stamp)) {
// Fallback to acquiring read lock
stamp = readLock();

try {
storedItem1 = table[bucket];
storedItem2 = table[bucket + 1];
} finally {
unlockRead(stamp);
}

if (storedItem1 != DeletedItem && storedItem1 != EmptyItem) {
processor.accept(storedItem1, storedItem2);
}
stamp = 0;
}
} finally {
if (acquiredReadLock) {
unlockRead(stamp);

if (storedItem1 != DeletedItem && storedItem1 != EmptyItem) {
processor.accept(storedItem1, storedItem2);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand Down Expand Up @@ -150,28 +149,28 @@ private Section<K, V> getSection(long hash) {
}

public void clear() {
for (Section<K, V> s : sections) {
s.clear();
for (int i = 0; i < sections.length; i++) {
sections[i].clear();
}
}

public void forEach(BiConsumer<? super K, ? super V> processor) {
for (Section<K, V> s : sections) {
s.forEach(processor);
for (int i = 0; i < sections.length; i++) {
sections[i].forEach(processor);
}
}

/**
* @return a new list of all keys (makes a copy)
*/
public List<K> keys() {
List<K> keys = Lists.newArrayList();
List<K> keys = new ArrayList<>((int) size());
forEach((key, value) -> keys.add(key));
return keys;
}

public List<V> values() {
List<V> values = Lists.newArrayList();
List<V> values = new ArrayList<>((int) size());
forEach((key, value) -> values.add(value));
return values;
}
Expand Down Expand Up @@ -355,42 +354,37 @@ void clear() {
}

public void forEach(BiConsumer<? super K, ? super V> processor) {
long stamp = tryOptimisticRead();

// Take a reference to the data table, if there is a rehashing event, we'll be
// simply iterating over a snapshot of the data.
Object[] table = this.table;
boolean acquiredReadLock = false;

try {

// Validate no rehashing
if (!validate(stamp)) {
// Fallback to read lock
stamp = readLock();
acquiredReadLock = true;
table = this.table;
// Go through all the buckets for this section. We try to renew the stamp only after a validation
// error, otherwise we keep going with the same.
long stamp = 0;
for (int bucket = 0; bucket < table.length; bucket += 2) {
if (stamp == 0) {
stamp = tryOptimisticRead();
}

// Go through all the buckets for this section
for (int bucket = 0; bucket < table.length; bucket += 2) {
K storedKey = (K) table[bucket];
V storedValue = (V) table[bucket + 1];
K storedKey = (K) table[bucket];
V storedValue = (V) table[bucket + 1];

if (!acquiredReadLock && !validate(stamp)) {
// Fallback to acquiring read lock
stamp = readLock();
acquiredReadLock = true;
if (!validate(stamp)) {
// Fallback to acquiring read lock
stamp = readLock();

try {
storedKey = (K) table[bucket];
storedValue = (V) table[bucket + 1];
} finally {
unlockRead(stamp);
}

if (storedKey != DeletedKey && storedKey != EmptyKey) {
processor.accept(storedKey, storedValue);
}
stamp = 0;
}
} finally {
if (acquiredReadLock) {
unlockRead(stamp);

if (storedKey != DeletedKey && storedKey != EmptyKey) {
processor.accept(storedKey, storedValue);
}
}
}
Expand Down
Loading

0 comments on commit 4c369c9

Please sign in to comment.