Skip to content

Commit

Permalink
fix: doc valus concurrent access by multiple threads
Browse files Browse the repository at this point in the history
  • Loading branch information
salvatore-campagna committed Aug 31, 2023
1 parent a3b5528 commit 9cc7a15
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ private static void executeInSortOrder(SearchContext context, BucketCollector co
searcher.setProfiler(context);
try {
searcher.search(context.rewrittenQuery(), collector);
collector.postCollection();
} catch (IOException e) {
throw new AggregationExecutionException("Could not perform time series aggregation", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ public class GlobalOrdCardinalityAggregator extends NumericMetricsAggregator.Sin
// Build at post-collection phase
@Nullable
private HyperLogLogPlusPlusSparse counts;
private SortedSetDocValues values;
private ObjectArray<BitArray> visitedOrds;
private SortedSetDocValues values;

public GlobalOrdCardinalityAggregator(
String name,
Expand Down Expand Up @@ -105,16 +105,23 @@ public ScoreMode scoreMode() {
private class CompetitiveIterator extends DocIdSetIterator {

private final BitArray visitedOrds;
private final SortedSetDocValues values;
private long numNonVisitedOrds;
private final TermsEnum indexTerms;
private final DocIdSetIterator docsWithField;

CompetitiveIterator(int numNonVisitedOrds, BitArray visitedOrds, Terms indexTerms, DocIdSetIterator docsWithField)
throws IOException {
CompetitiveIterator(
SortedSetDocValues values,
int numNonVisitedOrds,
BitArray visitedOrds,
Terms indexTerms,
DocIdSetIterator docsWithField
) throws IOException {
this.visitedOrds = visitedOrds;
this.numNonVisitedOrds = numNonVisitedOrds;
this.indexTerms = Objects.requireNonNull(indexTerms).iterator();
this.docsWithField = docsWithField;
this.values = values;
}

private Map<Long, PostingsEnum> nonVisitedOrds;
Expand Down Expand Up @@ -211,6 +218,7 @@ public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx,
if (maxOrd <= MAX_FIELD_CARDINALITY_FOR_DYNAMIC_PRUNING || numNonVisitedOrds <= MAX_TERMS_FOR_DYNAMIC_PRUNING) {
dynamicPruningAttempts++;
return new LeafBucketCollector() {
final SortedSetDocValues docValues = valuesSource.globalOrdinalsValues(aggCtx.getLeafReaderContext());

final BitArray bits;
final CompetitiveIterator competitiveIterator;
Expand All @@ -226,16 +234,16 @@ public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx,
}
this.bits = bits;
final DocIdSetIterator docsWithField = valuesSource.ordinalsValues(aggCtx.getLeafReaderContext());
competitiveIterator = new CompetitiveIterator(numNonVisitedOrds, bits, indexTerms, docsWithField);
competitiveIterator = new CompetitiveIterator(docValues, numNonVisitedOrds, bits, indexTerms, docsWithField);
if (numNonVisitedOrds <= MAX_TERMS_FOR_DYNAMIC_PRUNING) {
competitiveIterator.startPruning();
}
}

@Override
public void collect(int doc, long bucketOrd) throws IOException {
if (values.advanceExact(doc)) {
for (long ord = values.nextOrd(); ord != SortedSetDocValues.NO_MORE_ORDS; ord = values.nextOrd()) {
if (docValues.advanceExact(doc)) {
for (long ord = docValues.nextOrd(); ord != SortedSetDocValues.NO_MORE_ORDS; ord = docValues.nextOrd()) {
if (bits.getAndSet(ord) == false) {
competitiveIterator.onVisitedOrdinal(ord);
}
Expand Down Expand Up @@ -267,6 +275,8 @@ public CompetitiveIterator competitiveIterator() {

bruteForce++;
return new LeafBucketCollector() {
final SortedSetDocValues docValues = valuesSource.globalOrdinalsValues(aggCtx.getLeafReaderContext());

@Override
public void collect(int doc, long bucketOrd) throws IOException {
visitedOrds = bigArrays.grow(visitedOrds, bucketOrd + 1);
Expand All @@ -275,8 +285,8 @@ public void collect(int doc, long bucketOrd) throws IOException {
bits = new BitArray(maxOrd, bigArrays);
visitedOrds.set(bucketOrd, bits);
}
if (values.advanceExact(doc)) {
for (long ord = values.nextOrd(); ord != SortedSetDocValues.NO_MORE_ORDS; ord = values.nextOrd()) {
if (docValues.advanceExact(doc)) {
for (long ord = docValues.nextOrd(); ord != SortedSetDocValues.NO_MORE_ORDS; ord = docValues.nextOrd()) {
bits.set((int) ord);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,13 @@ public void search(Query query, BucketCollector bucketCollector) throws IOExcept
Weight weight = searcher.createWeight(query, bucketCollector.scoreMode(), 1);
if (searcher.getExecutor() == null) {
search(bucketCollector, weight);
bucketCollector.postCollection();
return;
}
// offload to the search worker thread pool whenever possible. It will be null only when search.worker_threads_enabled is false
RunnableFuture<Void> task = new FutureTask<>(() -> {
search(bucketCollector, weight);
bucketCollector.postCollection();
return null;
});
searcher.getExecutor().execute(task);
Expand Down

0 comments on commit 9cc7a15

Please sign in to comment.