Skip to content

Commit

Permalink
atlas: fix race condition for QueryIndex
Browse files Browse the repository at this point in the history
Updates can happen concurrently with reads. This can
result in some of the volatile variables being set to
`null` after the `null` check occurs. Now they will get
copied to a local reference before being used for reads.
  • Loading branch information
brharrington committed Aug 2, 2024
1 parent 0fe1beb commit d10671f
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,13 @@ public boolean isEmpty() {
return matches.isEmpty()
&& equalChecks.values().stream().allMatch(QueryIndex::isEmpty)
&& otherChecks.values().stream().allMatch(QueryIndex::isEmpty)
&& (hasKeyIdx == null || hasKeyIdx.isEmpty())
&& (otherKeysIdx == null || otherKeysIdx.isEmpty())
&& (missingKeysIdx == null || missingKeysIdx.isEmpty());
&& isEmpty(hasKeyIdx)
&& isEmpty(otherKeysIdx)
&& isEmpty(missingKeysIdx);
}

private boolean isEmpty(QueryIndex<T> idx) {
return idx == null || idx.isEmpty();
}

/**
Expand Down Expand Up @@ -369,14 +373,15 @@ private void forEachMatch(Id tags, int i, Consumer<T> consumer) {
// Matches for this level
matches.forEach(consumer);

if (key != null) {
final String keyRef = key;
if (keyRef != null) {

boolean keyPresent = false;

for (int j = i; j < tags.size(); ++j) {
String k = tags.getKey(j);
String v = tags.getValue(j);
int cmp = compare(k, key);
int cmp = compare(k, keyRef);
if (cmp == 0) {
keyPresent = true;

Expand Down Expand Up @@ -414,8 +419,9 @@ private void forEachMatch(Id tags, int i, Consumer<T> consumer) {
}

// Check matches for has key
if (hasKeyIdx != null) {
hasKeyIdx.forEachMatch(tags, i, consumer);
final QueryIndex<T> hasKeyIdxRef = hasKeyIdx;
if (hasKeyIdxRef != null) {
hasKeyIdxRef.forEachMatch(tags, i, consumer);
}
}

Expand All @@ -426,13 +432,15 @@ private void forEachMatch(Id tags, int i, Consumer<T> consumer) {
}

// Check matches with other keys
if (otherKeysIdx != null) {
otherKeysIdx.forEachMatch(tags, i, consumer);
final QueryIndex<T> otherKeysIdxRef = otherKeysIdx;
if (otherKeysIdxRef != null) {
otherKeysIdxRef.forEachMatch(tags, i, consumer);
}

// Check matches with missing keys
if (missingKeysIdx != null && !keyPresent) {
missingKeysIdx.forEachMatch(tags, i, consumer);
final QueryIndex<T> missingKeysIdxRef = missingKeysIdx;
if (missingKeysIdxRef != null && !keyPresent) {
missingKeysIdxRef.forEachMatch(tags, i, consumer);
}
}
}
Expand Down Expand Up @@ -468,8 +476,9 @@ public void forEachMatch(Function<String, String> tags, Consumer<T> consumer) {
matches.forEach(consumer);

boolean keyPresent = false;
if (key != null) {
String v = tags.apply(key);
final String keyRef = key;
if (keyRef != null) {
String v = tags.apply(keyRef);
if (v != null) {
keyPresent = true;

Expand Down Expand Up @@ -507,20 +516,23 @@ public void forEachMatch(Function<String, String> tags, Consumer<T> consumer) {
}

// Check matches for has key
if (hasKeyIdx != null) {
hasKeyIdx.forEachMatch(tags, consumer);
final QueryIndex<T> hasKeyIdxRef = hasKeyIdx;
if (hasKeyIdxRef != null) {
hasKeyIdxRef.forEachMatch(tags, consumer);
}
}
}

// Check matches with other keys
if (otherKeysIdx != null) {
otherKeysIdx.forEachMatch(tags, consumer);
final QueryIndex<T> otherKeysIdxRef = otherKeysIdx;
if (otherKeysIdxRef != null) {
otherKeysIdxRef.forEachMatch(tags, consumer);
}

// Check matches with missing keys
if (missingKeysIdx != null && !keyPresent) {
missingKeysIdx.forEachMatch(tags, consumer);
final QueryIndex<T> missingKeysIdxRef = missingKeysIdx;
if (missingKeysIdxRef != null && !keyPresent) {
missingKeysIdxRef.forEachMatch(tags, consumer);
}
}

Expand All @@ -545,8 +557,9 @@ public boolean couldMatch(Function<String, String> tags) {
}

boolean keyPresent = false;
if (key != null) {
String v = tags.apply(key);
final String keyRef = key;
if (keyRef != null) {
String v = tags.apply(keyRef);
if (v != null) {
keyPresent = true;

Expand All @@ -571,14 +584,16 @@ public boolean couldMatch(Function<String, String> tags) {
}

// Check matches for has key
if (hasKeyIdx != null && hasKeyIdx.couldMatch(tags)) {
final QueryIndex<T> hasKeyIdxRef = hasKeyIdx;
if (hasKeyIdxRef != null && hasKeyIdxRef.couldMatch(tags)) {
return true;
}
}
}

// Check matches with other keys
if (otherKeysIdx != null && otherKeysIdx.couldMatch(tags)) {
final QueryIndex<T> otherKeysIdxRef = otherKeysIdx;
if (otherKeysIdxRef != null && otherKeysIdxRef.couldMatch(tags)) {
return true;
}

Expand Down Expand Up @@ -625,11 +640,12 @@ private void findHotSpots(
Deque<String> path,
BiConsumer<List<String>, List<Query.KeyQuery>> consumer
) {
if (key != null) {
path.addLast("K=" + key);
final String keyRef = key;
if (keyRef != null) {
path.addLast("K=" + keyRef);

equalChecks.forEach((v, idx) -> {
path.addLast(key + "," + v + ",:eq");
path.addLast(keyRef + "," + v + ",:eq");
idx.findHotSpots(threshold, path, consumer);
path.removeLast();
});
Expand All @@ -646,24 +662,27 @@ private void findHotSpots(
});
path.removeLast();

if (hasKeyIdx != null) {
final QueryIndex<T> hasKeyIdxRef = hasKeyIdx;
if (hasKeyIdxRef != null) {
path.addLast("has");
hasKeyIdx.findHotSpots(threshold, path, consumer);
hasKeyIdxRef.findHotSpots(threshold, path, consumer);
path.removeLast();
}

path.removeLast();
}

if (otherKeysIdx != null) {
final QueryIndex<T> otherKeysIdxRef = otherKeysIdx;
if (otherKeysIdxRef != null) {
path.addLast("other-keys");
otherKeysIdx.findHotSpots(threshold, path, consumer);
otherKeysIdxRef.findHotSpots(threshold, path, consumer);
path.removeLast();
}

if (missingKeysIdx != null) {
final QueryIndex<T> missingKeysIdxRef = missingKeysIdx;
if (missingKeysIdxRef != null) {
path.addLast("missing-keys");
missingKeysIdx.findHotSpots(threshold, path, consumer);
missingKeysIdxRef.findHotSpots(threshold, path, consumer);
path.removeLast();
}
}
Expand All @@ -682,8 +701,9 @@ private StringBuilder indent(StringBuilder builder, int n) {
}

private void buildString(StringBuilder builder, int n) {
if (key != null) {
indent(builder, n).append("key: [").append(key).append("]\n");
final String keyRef = key;
if (keyRef != null) {
indent(builder, n).append("key: [").append(keyRef).append("]\n");
}
if (!equalChecks.isEmpty()) {
indent(builder, n).append("equal checks:\n");
Expand All @@ -699,17 +719,20 @@ private void buildString(StringBuilder builder, int n) {
idx.buildString(builder, n + 1);
});
}
if (hasKeyIdx != null) {
final QueryIndex<T> hasKeyIdxRef = hasKeyIdx;
if (hasKeyIdxRef != null) {
indent(builder, n).append("has key:\n");
hasKeyIdx.buildString(builder, n + 1);
hasKeyIdxRef.buildString(builder, n + 1);
}
if (otherKeysIdx != null) {
final QueryIndex<T> otherKeysIdxRef = otherKeysIdx;
if (otherKeysIdxRef != null) {
indent(builder, n).append("other keys:\n");
otherKeysIdx.buildString(builder, n + 1);
otherKeysIdxRef.buildString(builder, n + 1);
}
if (missingKeysIdx != null) {
final QueryIndex<T> missingKeysIdxRef = missingKeysIdx;
if (missingKeysIdxRef != null) {
indent(builder, n).append("missing keys:\n");
missingKeysIdx.buildString(builder, n + 1);
missingKeysIdxRef.buildString(builder, n + 1);
}
if (!matches.isEmpty()) {
indent(builder, n).append("matches:\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

public class QueryIndexTest {
Expand Down Expand Up @@ -589,4 +590,50 @@ public void couldMatchPartial() {
Assertions.assertFalse(idx.couldMatch(Query.toMap(id("foo2", "id", "bar"))::get));
Assertions.assertFalse(idx.couldMatch(Query.toMap(id("foo", "app", "bar-main"))::get));
}

@Test
public void updateRaceCondition() throws Exception {
Query q1 = Parser.parseQuery("name,test,:eq");
// Will be placed in otherKeysIdx for parent
Query q2 = Parser.parseQuery("foo,bar,:eq");

QueryIndex<Query> idx = QueryIndex.newInstance(new NoopRegistry());
idx.add(q1, q1);

final int N = 1_000_000;
final List<Thread> threads = new ArrayList<>(16);

// Query tasks
final AtomicInteger matches = new AtomicInteger();
final Id id = Id.create("test").withTag("foo", "bar");
for (int i = 0; i < 16; ++i) {
Thread t = new Thread(() -> {
for (int j = 0; j < N; ++j) {
matches.addAndGet(idx.findMatches(id).size());
}
});
t.start();
threads.add(t);
}

// Update by adding and removing q2 to lead to race condition for accessing
// volatile reference
Thread updater = new Thread(() -> {
for (int j = 0; j < N; ++j) {
idx.add(q2, q2);
idx.remove(q2, q2);
}
});
updater.start();
updater.join();

// Wait for read tasks to complete
threads.forEach(t -> {
try {
t.join();
} catch (Exception e) {
Assertions.fail(e);
}
});
}
}

0 comments on commit d10671f

Please sign in to comment.