Skip to content

Commit

Permalink
Refactor the filter rewrite optimization (opensearch-project#14464)
Browse files Browse the repository at this point in the history
* Refactor

Split the single Helper classes and move the classes into a new package for any optimization we introduced for search path.
Rename the class name to make it more straightforward and general

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* Refactor

refactor the canOptimize logic
sort out the basic rule about how to provide data from aggregator, and where to put common logic

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* Refactor

refactor the data provider and try optimize logic

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* Refactor

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* Refactor

extract segment match all logic

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* Refactor

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* Refactor

inline class

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* Fix a bug

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* address comment

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* prepareFromSegment now doesn't return Ranges

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* how it looks like when introduce interfaces

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* remove interface, clean up

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* improve doc

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* move multirangetraversal logic to helper

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* improve the refactor

package name -> filterrewrite
move tree traversal logic to new class
add documentation for important abstract methods
add sub class for composite aggregation bridge

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* Address Marc's comments

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* Address concurrent segment search concern

To save the ranges per segment, now change to a map that save ranges for segments separately.

The increment document function "incrementBucketDocCount" should already be thread safe, as it's the same method used by normal aggregation execution path

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* remove circular dependency

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* Address comment

- remove map of segment ranges, pass in by calling getRanges when needed
- use AtomicInteger for the debug info

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

---------

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
  • Loading branch information
bowenlan-amzn authored and akolarkunnu committed Sep 10, 2024
1 parent 0ee56bc commit ddc01bf
Show file tree
Hide file tree
Showing 14 changed files with 1,256 additions and 1,027 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@
import org.opensearch.search.aggregations.MultiBucketCollector;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
import org.opensearch.search.aggregations.bucket.FastFilterRewriteHelper;
import org.opensearch.search.aggregations.bucket.FastFilterRewriteHelper.AbstractDateHistogramAggregationType;
import org.opensearch.search.aggregations.bucket.filterrewrite.CompositeAggregatorBridge;
import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.internal.SearchContext;
Expand All @@ -89,13 +89,15 @@
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongUnaryOperator;
import java.util.stream.Collectors;

import static org.opensearch.search.aggregations.MultiBucketConsumerService.MAX_BUCKET_SETTING;
import static org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge.segmentMatchAll;

/**
* Main aggregator that aggregates docs from mulitple aggregations
* Main aggregator that aggregates docs from multiple aggregations
*
* @opensearch.internal
*/
Expand All @@ -118,9 +120,8 @@ public final class CompositeAggregator extends BucketsAggregator {

private boolean earlyTerminated;

private final FastFilterRewriteHelper.FastFilterContext fastFilterContext;
private LongKeyedBucketOrds bucketOrds = null;
private Rounding.Prepared preparedRounding = null;
private final FilterRewriteOptimizationContext filterRewriteOptimizationContext;
private LongKeyedBucketOrds bucketOrds;

CompositeAggregator(
String name,
Expand Down Expand Up @@ -166,57 +167,62 @@ public final class CompositeAggregator extends BucketsAggregator {
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.rawAfterKey = rawAfterKey;

fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context);
if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) {
return;
}
fastFilterContext.setAggregationType(new CompositeAggregationType());
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
// bucketOrds is used for saving date histogram results
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
preparedRounding = ((CompositeAggregationType) fastFilterContext.getAggregationType()).getRoundingPrepared();
fastFilterContext.buildRanges(sourceConfigs[0].fieldType());
}
}
CompositeAggregatorBridge bridge = new CompositeAggregatorBridge() {
private RoundingValuesSource valuesSource;
private long afterKey = -1L;

/**
* Currently the filter rewrite is only supported for date histograms
*/
public class CompositeAggregationType extends AbstractDateHistogramAggregationType {
private final RoundingValuesSource valuesSource;
private long afterKey = -1L;

public CompositeAggregationType() {
super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript());
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
if (rawAfterKey != null) {
assert rawAfterKey.size() == 1 && formats.size() == 1;
this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");
});
@Override
protected boolean canOptimize() {
if (canOptimize(sourceConfigs)) {
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
if (rawAfterKey != null) {
assert rawAfterKey.size() == 1 && formats.size() == 1;
this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");
});
}

// bucketOrds is used for saving the date histogram results got from the optimization path
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
return true;
}
return false;
}
}

public Rounding getRounding(final long low, final long high) {
return valuesSource.getRounding();
}
@Override
protected void prepare() throws IOException {
buildRanges(context);
}

public Rounding.Prepared getRoundingPrepared() {
return valuesSource.getPreparedRounding();
}
protected Rounding getRounding(final long low, final long high) {
return valuesSource.getRounding();
}

@Override
protected void processAfterKey(long[] bound, long interval) {
// afterKey is the last bucket key in previous response, and the bucket key
// is the minimum of all values in the bucket, so need to add the interval
if (afterKey != -1L) {
bound[0] = afterKey + interval;
protected Rounding.Prepared getRoundingPrepared() {
return valuesSource.getPreparedRounding();
}
}

public int getSize() {
return size;
}
@Override
protected long[] processAfterKey(long[] bounds, long interval) {
// afterKey is the last bucket key in previous response, and the bucket key
// is the minimum of all values in the bucket, so need to add the interval
if (afterKey != -1L) {
bounds[0] = afterKey + interval;
}
return bounds;
}

@Override
protected int getSize() {
return size;
}

@Override
protected Function<Long, Long> bucketOrdProducer() {
return (key) -> bucketOrds.add(0, getRoundingPrepared().round((long) key));
}
};
filterRewriteOptimizationContext = new FilterRewriteOptimizationContext(bridge, parent, subAggregators.length, context);
}

@Override
Expand Down Expand Up @@ -368,7 +374,7 @@ private boolean isMaybeMultivalued(LeafReaderContext context, SortField sortFiel
return v2 != null && DocValues.unwrapSingleton(v2) == null;

default:
// we have no clue whether the field is multi-valued or not so we assume it is.
// we have no clue whether the field is multivalued or not so we assume it is.
return true;
}
}
Expand Down Expand Up @@ -551,11 +557,7 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t

@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
boolean optimized = fastFilterContext.tryFastFilterAggregation(
ctx,
this::incrementBucketDocCount,
(key) -> bucketOrds.add(0, preparedRounding.round((long) key))
);
boolean optimized = filterRewriteOptimizationContext.tryOptimize(ctx, this::incrementBucketDocCount, segmentMatchAll(context, ctx));
if (optimized) throw new CollectionTerminatedException();

finishLeaf();
Expand Down Expand Up @@ -709,11 +711,6 @@ private static class Entry {

@Override
public void collectDebugInfo(BiConsumer<String, Object> add) {
if (fastFilterContext.optimizedSegments > 0) {
add.accept("optimized_segments", fastFilterContext.optimizedSegments);
add.accept("unoptimized_segments", fastFilterContext.segments - fastFilterContext.optimizedSegments);
add.accept("leaf_visited", fastFilterContext.leaf);
add.accept("inner_visited", fastFilterContext.inner);
}
filterRewriteOptimizationContext.populateDebugInfo(add);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.bucket.filterrewrite;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.opensearch.index.mapper.MappedFieldType;

import java.io.IOException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
* This interface provides a bridge between an aggregator and the optimization context, allowing
* the aggregator to provide data and optimize the aggregation process.
*
* <p>The main purpose of this interface is to encapsulate the aggregator-specific optimization
* logic and provide access to the data in Aggregator that is required for optimization, while keeping the optimization
* business logic separate from the aggregator implementation.
*
* <p>To use this interface to optimize an aggregator, you should subclass this interface in this package
* and put any specific optimization business logic in it. Then implement this subclass in the aggregator
* to provide data that is needed for doing the optimization
*
* @opensearch.internal
*/
public abstract class AggregatorBridge {

/**
* The field type associated with this aggregator bridge.
*/
MappedFieldType fieldType;

Consumer<Ranges> setRanges;

void setRangesConsumer(Consumer<Ranges> setRanges) {
this.setRanges = setRanges;
}

/**
* Checks whether the aggregator can be optimized.
* <p>
* This method is supposed to be implemented in a specific aggregator to take in fields from there
*
* @return {@code true} if the aggregator can be optimized, {@code false} otherwise.
* The result will be saved in the optimization context.
*/
protected abstract boolean canOptimize();

/**
* Prepares the optimization at shard level after checking aggregator is optimizable.
* <p>
* For example, figure out what are the ranges from the aggregation to do the optimization later
* <p>
* This method is supposed to be implemented in a specific aggregator to take in fields from there
*/
protected abstract void prepare() throws IOException;

/**
* Prepares the optimization for a specific segment when the segment is functionally matching all docs
*
* @param leaf the leaf reader context for the segment
*/
abstract Ranges tryBuildRangesFromSegment(LeafReaderContext leaf) throws IOException;

/**
* Attempts to build aggregation results for a segment
*
* @param values the point values (index structure for numeric values) for a segment
* @param incrementDocCount a consumer to increment the document count for a range bucket. The First parameter is document count, the second is the key of the bucket
* @param ranges
*/
abstract FilterRewriteOptimizationContext.DebugInfo tryOptimize(
PointValues values,
BiConsumer<Long, Long> incrementDocCount,
Ranges ranges
) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.bucket.filterrewrite;

import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceConfig;
import org.opensearch.search.aggregations.bucket.composite.RoundingValuesSource;

/**
* For composite aggregation to do optimization when it only has a single date histogram source
*/
public abstract class CompositeAggregatorBridge extends DateHistogramAggregatorBridge {
protected boolean canOptimize(CompositeValuesSourceConfig[] sourceConfigs) {
if (sourceConfigs.length != 1 || !(sourceConfigs[0].valuesSource() instanceof RoundingValuesSource)) return false;
return canOptimize(sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript(), sourceConfigs[0].fieldType());
}

private boolean canOptimize(boolean missing, boolean hasScript, MappedFieldType fieldType) {
if (!missing && !hasScript) {
if (fieldType instanceof DateFieldMapper.DateFieldType) {
if (fieldType.isSearchable()) {
this.fieldType = fieldType;
return true;
}
}
}
return false;
}
}
Loading

0 comments on commit ddc01bf

Please sign in to comment.