Skip to content

Commit

Permalink
Perform buildAggregation concurrently and support Composite Aggregations
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Deng <jayd0104@gmail.com>
  • Loading branch information
jed326 authored and Jay Deng committed Mar 18, 2024
1 parent 1f5df54 commit 9f55c21
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Tiered caching] Add policies controlling which values can enter pluggable caches [EXPERIMENTAL] ([#12542](https://github.com/opensearch-project/OpenSearch/pull/12542))
- [Tiered caching] Add Stale keys Management and CacheCleaner to IndicesRequestCache ([#12625](https://github.com/opensearch-project/OpenSearch/pull/12625))
- [Admission Control] Integrated IO Based AdmissionController to AdmissionControl Framework ([#12583](https://github.com/opensearch-project/OpenSearch/pull/12583))
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))

### Dependencies
- Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collection;
import java.util.List;

import static org.opensearch.indices.IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
Expand All @@ -50,23 +51,25 @@ public void setupSuiteScopeCluster() throws Exception {
assertAcked(
prepareCreate(
"idx",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), false)
).setMapping("type", "type=keyword", "num", "type=integer", "score", "type=integer")
);
waitForRelocation(ClusterHealthStatus.GREEN);

client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5").get();
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50").get();
refresh("idx");
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2").get();
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20").get();
refresh("idx");
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10").get();
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15").get();
refresh("idx");
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1").get();
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100").get();
refresh("idx");
indexRandom(
true,
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50"),
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20"),
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15"),
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100")
);

waitForRelocation(ClusterHealthStatus.GREEN);
refresh();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
import org.opensearch.search.query.ReduceableSearchResult;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;

/**
* Common {@link CollectorManager} used by both concurrent and non-concurrent aggregation path and also for global and non-global
Expand Down Expand Up @@ -56,17 +56,9 @@ public String getCollectorReason() {

@Override
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
final List<Aggregator> aggregators = context.bucketCollectorProcessor().toAggregators(collectors);
final List<InternalAggregation> internals = new ArrayList<>(aggregators.size());
List<InternalAggregation> internals = context.bucketCollectorProcessor().toInternalAggregations(collectors);
assert internals.stream().noneMatch(Objects::isNull);
context.aggregations().resetBucketMultiConsumer();
for (Aggregator aggregator : aggregators) {
try {
// post collection is called in ContextIndexSearcher after search on leaves are completed
internals.add(aggregator.buildTopLevel());
} catch (IOException e) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
}
}

final InternalAggregations internalAggregations = InternalAggregations.from(internals);
return buildAggregationResult(internalAggregations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.search.aggregations;

import org.opensearch.OpenSearchParseException;
import org.opensearch.common.SetOnce;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.lease.Releasable;
import org.opensearch.core.ParseField;
Expand Down Expand Up @@ -61,6 +62,8 @@
@PublicApi(since = "1.0.0")
public abstract class Aggregator extends BucketCollector implements Releasable {

final SetOnce<InternalAggregation> internalAggregation = new SetOnce<>();

/**
* Parses the aggregation request and creates the appropriate aggregator factory for it.
*
Expand All @@ -83,6 +86,20 @@ public interface Parser {
AggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException;
}

/**
* Stores an InternalAggregation
*/
public void setInternalAggregation(InternalAggregation internalAggregation) {
this.internalAggregation.set(internalAggregation);
}

/**
* Returns the stored InternalAggregation
*/
public InternalAggregation getInternalAggregation() {
return internalAggregation.get();
}

/**
* Return the name of this aggregator.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,15 @@ public void postCollection() throws IOException {
collectableSubAggregators.postCollection();
}

/**
* For top level aggregators, build the InternalAggregations object and store it in class field.
*/
public void buildAndSetInternalAggregation() throws IOException {
if (parent == null) {
setInternalAggregation(buildTopLevel());
}
}

/** Called upon release of the aggregator. */
@Override
public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.lucene.MinimumScoreCollector;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.aggregation.ProfilingAggregator;
import org.opensearch.search.profile.query.InternalProfileCollector;

import java.io.IOException;
Expand All @@ -22,6 +23,7 @@
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;

/**
Expand Down Expand Up @@ -63,6 +65,7 @@ public void processPostCollection(Collector collectorTree) throws IOException {
while (!collectors.isEmpty()) {
Collector currentCollector = collectors.poll();
if (currentCollector instanceof InternalProfileCollector) {
// Profile collector should be the top level one so we should be able to call buildAggregation on it here
collectors.offer(((InternalProfileCollector) currentCollector).getCollector());
} else if (currentCollector instanceof MinimumScoreCollector) {
collectors.offer(((MinimumScoreCollector) currentCollector).getCollector());
Expand All @@ -72,6 +75,19 @@ public void processPostCollection(Collector collectorTree) throws IOException {
}
} else if (currentCollector instanceof BucketCollector) {
((BucketCollector) currentCollector).postCollection();

// Perform build aggregation during post collection
if (currentCollector instanceof AggregatorBase) {
((AggregatorBase) currentCollector).buildAndSetInternalAggregation();
} else if (currentCollector instanceof ProfilingAggregator) {
((ProfilingAggregator) currentCollector).setInternalAggregation(
((ProfilingAggregator) currentCollector).buildTopLevel()
);
} else if (currentCollector instanceof MultiBucketCollector) {
for (Collector innerCollector : ((MultiBucketCollector) currentCollector).getCollectors()) {
collectors.offer(innerCollector);
}
}
}
}
}
Expand Down Expand Up @@ -106,4 +122,64 @@ public List<Aggregator> toAggregators(Collection<Collector> collectors) {
}
return aggregators;
}

/**
* Unwraps the input collection of {@link Collector} to get the list of the {@link InternalAggregation}. The
* input is expected to contain the collectors related to Aggregations only as that is passed to {@link AggregationCollectorManager}
* during the reduce phase. This list of {@link InternalAggregation} is used to optionally perform reduce at shard level before
* returning response to coordinator
* @param collectors collection of aggregation collectors to reduce
* @return list of unwrapped {@link InternalAggregation}
*/
public List<InternalAggregation> toInternalAggregations(Collection<Collector> collectors) throws IOException {
List<InternalAggregation> internalAggregations = new ArrayList<>();

final Deque<Collector> allCollectors = new LinkedList<>(collectors);
while (!allCollectors.isEmpty()) {
final Collector currentCollector = allCollectors.pop();
if (currentCollector instanceof AggregatorBase) {
internalAggregations.add(((AggregatorBase) currentCollector).getInternalAggregation());
} else if (currentCollector instanceof ProfilingAggregator) {
internalAggregations.add(((ProfilingAggregator) currentCollector).getInternalAggregation());
} else if (currentCollector instanceof InternalProfileCollector) {
if (((InternalProfileCollector) currentCollector).getCollector() instanceof AggregatorBase) {
internalAggregations.add(
((AggregatorBase) ((InternalProfileCollector) currentCollector).getCollector()).getInternalAggregation()
);
} else if (((InternalProfileCollector) currentCollector).getCollector() instanceof MultiBucketCollector) {
allCollectors.addAll(
Arrays.asList(((MultiBucketCollector) ((InternalProfileCollector) currentCollector).getCollector()).getCollectors())
);
}
} else if (currentCollector instanceof MultiBucketCollector) {
allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors()));
}
}

// Check that internalAggregations does not contain any null objects as that means postCollection was not called for a given
// collector. This can happen as collect will not get called whenever there are no leaves on a shard. Since we build the
// InternalAggregation in postCollection that will not get called in such cases either. Therefore we need to manually call it again
// here to build empty InternalAggregation objects for this collector tree.
if (internalAggregations.stream().anyMatch(Objects::isNull)) {
allCollectors.addAll(collectors);
while (!allCollectors.isEmpty()) {
final Collector currentCollector = allCollectors.pop();
if (currentCollector instanceof AggregatorBase) {
((AggregatorBase) currentCollector).buildAndSetInternalAggregation();
} else if (currentCollector instanceof ProfilingAggregator) {
((ProfilingAggregator) currentCollector).setInternalAggregation(
((ProfilingAggregator) currentCollector).buildTopLevel()
);
} else if (currentCollector instanceof MultiBucketCollector) {
for (Collector innerCollector : ((MultiBucketCollector) currentCollector).getCollectors()) {
allCollectors.offer(innerCollector);
}
}
}
// Iterate through collector tree again to get InternalAggregations object
return toInternalAggregations(collectors);
} else {
return internalAggregations;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public static class MultiBucketConsumer implements IntConsumer {

// aggregations execute in a single thread for both sequential
// and concurrent search, so no atomic here
private int count;
private final LongAdder count;

// will be updated by multiple threads in concurrent search
// hence making it as LongAdder
Expand All @@ -145,6 +145,7 @@ public MultiBucketConsumer(int limit, CircuitBreaker breaker) {
this.limit = limit;
this.breaker = breaker;
callCount = new LongAdder();
count = new LongAdder();
availProcessors = Runtime.getRuntime().availableProcessors();
}

Expand All @@ -158,6 +159,7 @@ protected MultiBucketConsumer(
) {
this.limit = limit;
this.breaker = breaker;
this.count = new LongAdder();
this.callCount = callCount;
this.circuitBreakerTripped = circuitBreakerTripped;
this.availProcessors = availProcessors;
Expand All @@ -166,8 +168,9 @@ protected MultiBucketConsumer(
@Override
public void accept(int value) {
if (value != 0) {
count += value;
if (count > limit) {
count.add(value);
;
if (count.intValue() > limit) {
throw new TooManyBucketsException(
"Trying to create too many buckets. Must be less than or equal to: ["
+ limit
Expand Down Expand Up @@ -205,11 +208,11 @@ public void accept(int value) {
}

public void reset() {
this.count = 0;
this.count.reset();
}

public int getCount() {
return count;
return count.intValue();
}

public int getLimit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;

/**
Expand Down Expand Up @@ -80,7 +81,7 @@ protected Aggregator createInternal(

@Override
protected boolean supportsConcurrentSegmentSearch() {
// See https://github.com/opensearch-project/OpenSearch/issues/12331 for details
return false;
// Disable concurrent search if any scripting is used. See https://github.com/opensearch-project/OpenSearch/issues/12331 for details
return Arrays.stream(sources).allMatch(CompositeValuesSourceConfig::hasScript) == false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
private final long valueCount;
private final String fieldName;
private Weight weight;
private final GlobalOrdLookupFunction lookupGlobalOrd;
protected final CollectionStrategy collectionStrategy;
protected int segmentsWithSingleValuedOrds = 0;
protected int segmentsWithMultiValuedOrds = 0;
Expand Down Expand Up @@ -129,11 +128,10 @@ public GlobalOrdinalsStringTermsAggregator(
this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job.
this.valuesSource = valuesSource;
final IndexReader reader = context.searcher().getIndexReader();
final SortedSetDocValues values = reader.leaves().size() > 0
final SortedSetDocValues values = !reader.leaves().isEmpty()
? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0))
: DocValues.emptySortedSet();
this.valueCount = values.getValueCount();
this.lookupGlobalOrd = values::lookupOrd;
this.acceptedGlobalOrdinals = includeExclude == null ? ALWAYS_TRUE : includeExclude.acceptedGlobalOrdinals(values)::get;
if (remapGlobalOrds) {
this.collectionStrategy = new RemapGlobalOrds(cardinality);
Expand Down Expand Up @@ -885,7 +883,12 @@ PriorityQueue<OrdBucket> buildPriorityQueue(int size) {
}

StringTerms.Bucket convertTempBucketToRealBucket(OrdBucket temp) throws IOException {
BytesRef term = BytesRef.deepCopyOf(lookupGlobalOrd.apply(temp.globalOrd));
// Recreate DocValues as needed for concurrent segment search
SortedSetDocValues values = !context.searcher().getIndexReader().leaves().isEmpty()
? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0))
: DocValues.emptySortedSet();
BytesRef term = BytesRef.deepCopyOf(values.lookupOrd(temp.globalOrd));

StringTerms.Bucket result = new StringTerms.Bucket(term, temp.docCount, null, showTermDocCountError, 0, format);
result.bucketOrd = temp.bucketOrd;
result.docCountError = 0;
Expand Down Expand Up @@ -1001,7 +1004,11 @@ BucketUpdater<SignificantStringTerms.Bucket> bucketUpdater(long owningBucketOrd)
long subsetSize = subsetSize(owningBucketOrd);
return (spare, globalOrd, bucketOrd, docCount) -> {
spare.bucketOrd = bucketOrd;
oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes);
// Recreate DocValues as needed for concurrent segment search
SortedSetDocValues values = !context.searcher().getIndexReader().leaves().isEmpty()
? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0))
: DocValues.emptySortedSet();
oversizedCopy(values.lookupOrd(globalOrd), spare.termBytes);
spare.subsetDf = docCount;
spare.subsetSize = subsetSize;
spare.supersetDf = backgroundFrequencies.freq(spare.termBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ public List<Aggregator> toAggregators(Collection<Collector> collectors) {
// should not be called when there is no aggregation collector
throw new IllegalStateException("Unexpected toAggregators call on NO_OP_BUCKET_COLLECTOR_PROCESSOR");
}

@Override
public List<InternalAggregation> toInternalAggregations(Collection<Collector> collectors) {
// should not be called when there is no aggregation collector
throw new IllegalStateException("Unexpected toInternalAggregations call on NO_OP_BUCKET_COLLECTOR_PROCESSOR");
}
};

private final List<Releasable> releasables = new CopyOnWriteArrayList<>();
Expand Down
Loading

0 comments on commit 9f55c21

Please sign in to comment.