From 9a0a69fb530732e49834a2f2ddc3011cca82b26c Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Wed, 7 Feb 2024 10:20:00 -0800 Subject: [PATCH] Apply fast date histogram optimization at the segment level (#12073) --------- Signed-off-by: bowenlan-amzn --- .../aggregations/bucket/DateHistogramIT.java | 4 +- .../bucket/FastFilterRewriteHelper.java | 360 ++++++++++++------ .../bucket/composite/CompositeAggregator.java | 61 ++- .../AutoDateHistogramAggregator.java | 61 +-- .../histogram/DateHistogramAggregator.java | 33 +- .../composite/CompositeAggregatorTests.java | 69 ++++ .../DateHistogramAggregatorTests.java | 208 +++++++++- .../BaseCompositeAggregatorTestCase.java | 8 + 8 files changed, 624 insertions(+), 180 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/DateHistogramIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/DateHistogramIT.java index 64c9c792b866a..6a15490cbfe63 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/DateHistogramIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/DateHistogramIT.java @@ -177,9 +177,9 @@ public void setupSuiteScopeCluster() throws Exception { indexDoc(2, 15, 3), // date: Feb 15, dates: Feb 15, Mar 16 indexDoc(3, 2, 4), // date: Mar 2, dates: Mar 2, Apr 3 indexDoc(3, 15, 5), // date: Mar 15, dates: Mar 15, Apr 16 - indexDoc(3, 23, 6) + indexDoc(3, 23, 6) // date: Mar 23, dates: Mar 23, Apr 24 ) - ); // date: Mar 23, dates: Mar 23, Apr 24 + ); indexRandom(true, builders); ensureSearchable(); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index f377287d0b3bd..6f1cc901e2d82 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -8,9 +8,15 @@ package org.opensearch.search.aggregations.bucket; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.DocValues; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.PointValues; import org.apache.lucene.search.ConstantScoreQuery; +import org.apache.lucene.search.FieldExistsQuery; import org.apache.lucene.search.IndexOrDocValuesQuery; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.PointRangeQuery; @@ -18,16 +24,15 @@ import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Weight; import org.apache.lucene.util.NumericUtils; -import org.opensearch.common.CheckedFunction; import org.opensearch.common.Rounding; import org.opensearch.common.lucene.search.function.FunctionScoreQuery; import org.opensearch.index.mapper.DateFieldMapper; +import org.opensearch.index.mapper.DocCountFieldMapper; import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.query.DateRangeIncludingNowQuery; -import org.opensearch.search.DocValueFormat; -import org.opensearch.search.aggregations.bucket.composite.CompositeKey; import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceConfig; import org.opensearch.search.aggregations.bucket.composite.RoundingValuesSource; +import org.opensearch.search.aggregations.bucket.histogram.LongBounds; import org.opensearch.search.internal.SearchContext; import java.io.IOException; @@ -37,7 +42,8 @@ import java.util.OptionalLong; import java.util.function.BiConsumer; import java.util.function.Function; -import java.util.function.Supplier; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; /** * Utility class to help rewrite aggregations into filters. @@ -55,6 +61,8 @@ public final class FastFilterRewriteHelper { private FastFilterRewriteHelper() {} + private static final Logger logger = LogManager.getLogger(FastFilterRewriteHelper.class); + private static final int MAX_NUM_FILTER_BUCKETS = 1024; private static final Map, Function> queryWrappers; @@ -80,13 +88,13 @@ private static Query unwrapIntoConcreteQuery(Query query) { } /** - * Finds the min and max bounds of field values for the shard + * Finds the global min and max bounds of the field for the shard across all segments + * + * @return null if the field is empty or not indexed */ - private static long[] getIndexBounds(final SearchContext context, final String fieldName) throws IOException { + private static long[] getShardBounds(final SearchContext context, final String fieldName) throws IOException { final List leaves = context.searcher().getIndexReader().leaves(); long min = Long.MAX_VALUE, max = Long.MIN_VALUE; - // Since the query does not specify bounds for aggregation, we can - // build the global min/max from local min/max within each segment for (LeafReaderContext leaf : leaves) { final PointValues values = leaf.reader().getPointValues(fieldName); if (values != null) { @@ -95,51 +103,80 @@ private static long[] getIndexBounds(final SearchContext context, final String f } } - if (min == Long.MAX_VALUE || max == Long.MIN_VALUE) return null; + if (min == Long.MAX_VALUE || max == Long.MIN_VALUE) { + return null; + } + return new long[] { min, max }; + } + /** + * Finds the min and max bounds of the field for the segment + * + * @return null if the field is empty or not indexed + */ + private static long[] getSegmentBounds(final LeafReaderContext context, final String fieldName) throws IOException { + long min = Long.MAX_VALUE, max = Long.MIN_VALUE; + final PointValues values = context.reader().getPointValues(fieldName); + if (values != null) { + min = Math.min(min, NumericUtils.sortableBytesToLong(values.getMinPackedValue(), 0)); + max = Math.max(max, NumericUtils.sortableBytesToLong(values.getMaxPackedValue(), 0)); + } + + if (min == Long.MAX_VALUE || max == Long.MIN_VALUE) { + return null; + } return new long[] { min, max }; } /** - * This method also acts as a pre-condition check for the optimization, - * returns null if the optimization cannot be applied + * This method also acts as a pre-condition check for the optimization + * + * @return null if the processed query not as expected */ - public static long[] getAggregationBounds(final SearchContext context, final String fieldName) throws IOException { + public static long[] getDateHistoAggBounds(final SearchContext context, final String fieldName) throws IOException { final Query cq = unwrapIntoConcreteQuery(context.query()); - final long[] indexBounds = getIndexBounds(context, fieldName); if (cq instanceof PointRangeQuery) { final PointRangeQuery prq = (PointRangeQuery) cq; - // Ensure that the query and aggregation are on the same field - if (prq.getField().equals(fieldName)) { - return new long[] { - // Minimum bound for aggregation is the max between query and global - Math.max(NumericUtils.sortableBytesToLong(prq.getLowerPoint(), 0), indexBounds[0]), - // Maximum bound for aggregation is the min between query and global - Math.min(NumericUtils.sortableBytesToLong(prq.getUpperPoint(), 0), indexBounds[1]) }; - } + final long[] indexBounds = getShardBounds(context, fieldName); + if (indexBounds == null) return null; + return getBoundsWithRangeQuery(prq, fieldName, indexBounds); } else if (cq instanceof MatchAllDocsQuery) { - return indexBounds; + return getShardBounds(context, fieldName); + } else if (cq instanceof FieldExistsQuery) { + // when a range query covers all values of a shard, it will be rewrite field exists query + if (((FieldExistsQuery) cq).getField().equals(fieldName)) { + return getShardBounds(context, fieldName); + } } - // Check if the top-level query (which may be a PRQ on another field) is functionally match-all - Weight weight = context.searcher().createWeight(context.query(), ScoreMode.COMPLETE_NO_SCORES, 1f); - for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) { - if (weight.count(ctx) != ctx.reader().numDocs()) { + + return null; + } + + private static long[] getBoundsWithRangeQuery(PointRangeQuery prq, String fieldName, long[] indexBounds) { + // Ensure that the query and aggregation are on the same field + if (prq.getField().equals(fieldName)) { + // Minimum bound for aggregation is the max between query and global + long lower = Math.max(NumericUtils.sortableBytesToLong(prq.getLowerPoint(), 0), indexBounds[0]); + // Maximum bound for aggregation is the min between query and global + long upper = Math.min(NumericUtils.sortableBytesToLong(prq.getUpperPoint(), 0), indexBounds[1]); + if (lower > upper) { return null; } + return new long[] { lower, upper }; } - return indexBounds; + + return null; } /** * Creates the date range filters for aggregations using the interval, min/max - * bounds and the rounding values + * bounds and prepared rounding */ private static Weight[] createFilterForAggregations( final SearchContext context, + final DateFieldMapper.DateFieldType fieldType, final long interval, final Rounding.Prepared preparedRounding, - final String field, - final DateFieldMapper.DateFieldType fieldType, long low, final long high ) throws IOException { @@ -149,7 +186,10 @@ private static Weight[] createFilterForAggregations( int bucketCount = 0; while (roundedLow <= fieldType.convertNanosToMillis(high)) { bucketCount++; - if (bucketCount > MAX_NUM_FILTER_BUCKETS) return null; + if (bucketCount > MAX_NUM_FILTER_BUCKETS) { + logger.debug("Max number of filters reached [{}], skip the fast filter optimization", MAX_NUM_FILTER_BUCKETS); + return null; + } // Below rounding is needed as the interval could return in // non-rounded values for something like calendar month roundedLow = preparedRounding.round(roundedLow + interval); @@ -176,10 +216,10 @@ private static Weight[] createFilterForAggregations( // is included in the next bucket fieldType.convertRoundedMillisToNanos(roundedLow) - 1, upper, 0); - filters[i++] = context.searcher().createWeight(new PointRangeQuery(field, lower, upper, 1) { + filters[i++] = context.searcher().createWeight(new PointRangeQuery(fieldType.name(), lower, upper, 1) { @Override protected String toString(int dimension, byte[] value) { - return null; + return Long.toString(LongPoint.decodeDimension(value, 0)); } }, ScoreMode.COMPLETE_NO_SCORES, 1); } @@ -189,16 +229,24 @@ protected String toString(int dimension, byte[] value) { } /** - * Context object to do fast filter optimization + * Context object for fast filter optimization + *

+ * Usage: first set aggregation type, then check isRewriteable, then buildFastFilter */ public static class FastFilterContext { + private boolean rewriteable = false; private Weight[] filters = null; - public AggregationType aggregationType; + private boolean filtersBuiltAtShardLevel = false; - public FastFilterContext() {} + private AggregationType aggregationType; + private final SearchContext context; - private void setFilters(Weight[] filters) { - this.filters = filters; + public FastFilterContext(SearchContext context) { + this.context = context; + } + + public AggregationType getAggregationType() { + return aggregationType; } public void setAggregationType(AggregationType aggregationType) { @@ -206,119 +254,145 @@ public void setAggregationType(AggregationType aggregationType) { } public boolean isRewriteable(final Object parent, final int subAggLength) { - return aggregationType.isRewriteable(parent, subAggLength); + boolean rewriteable = aggregationType.isRewriteable(parent, subAggLength); + logger.debug("Fast filter rewriteable: {} for shard {}", rewriteable, context.indexShard().shardId()); + this.rewriteable = rewriteable; + return rewriteable; } - /** - * This filter build method is for date histogram aggregation type - * - * @param computeBounds get the lower and upper bound of the field in a shard search - * @param roundingFunction produce Rounding that contains interval of date range. - * Rounding is computed dynamically using the bounds in AutoDateHistogram - * @param preparedRoundingSupplier produce PreparedRounding to round values at call-time - */ - public void buildFastFilter( - SearchContext context, - CheckedFunction computeBounds, - Function roundingFunction, - Supplier preparedRoundingSupplier - ) throws IOException { - assert this.aggregationType instanceof DateHistogramAggregationType; - DateHistogramAggregationType aggregationType = (DateHistogramAggregationType) this.aggregationType; - DateFieldMapper.DateFieldType fieldType = aggregationType.getFieldType(); - final long[] bounds = computeBounds.apply(aggregationType); - if (bounds == null) return; - - final Rounding rounding = roundingFunction.apply(bounds); - final OptionalLong intervalOpt = Rounding.getInterval(rounding); - if (intervalOpt.isEmpty()) return; - final long interval = intervalOpt.getAsLong(); - - // afterKey is the last bucket key in previous response, while the bucket key - // is the start of the bucket values, so add the interval - if (aggregationType instanceof CompositeAggregationType && ((CompositeAggregationType) aggregationType).afterKey != -1) { - bounds[0] = ((CompositeAggregationType) aggregationType).afterKey + interval; + public void buildFastFilter() throws IOException { + assert filters == null : "Filters should only be built once, but they are already built"; + this.filters = this.aggregationType.buildFastFilter(context); + if (filters != null) { + logger.debug("Fast filter built for shard {}", context.indexShard().shardId()); + filtersBuiltAtShardLevel = true; } + } - final Weight[] filters = FastFilterRewriteHelper.createFilterForAggregations( - context, - interval, - preparedRoundingSupplier.get(), - fieldType.name(), - fieldType, - bounds[0], - bounds[1] - ); - this.setFilters(filters); + public void buildFastFilter(LeafReaderContext leaf) throws IOException { + assert filters == null : "Filters should only be built once, but they are already built"; + this.filters = this.aggregationType.buildFastFilter(leaf, context); + if (filters != null) { + logger.debug("Fast filter built for shard {} segment {}", context.indexShard().shardId(), leaf.ord); + } } } /** * Different types have different pre-conditions, filter building logic, etc. */ - public interface AggregationType { + interface AggregationType { + boolean isRewriteable(Object parent, int subAggLength); + + Weight[] buildFastFilter(SearchContext ctx) throws IOException; + + Weight[] buildFastFilter(LeafReaderContext leaf, SearchContext ctx) throws IOException; + + default int getSize() { + return Integer.MAX_VALUE; + } } /** * For date histogram aggregation */ - public static class DateHistogramAggregationType implements AggregationType { + public static abstract class AbstractDateHistogramAggregationType implements AggregationType { private final MappedFieldType fieldType; private final boolean missing; private final boolean hasScript; + private LongBounds hardBounds; - public DateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) { + public AbstractDateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) { this.fieldType = fieldType; this.missing = missing; this.hasScript = hasScript; } + public AbstractDateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript, LongBounds hardBounds) { + this(fieldType, missing, hasScript); + this.hardBounds = hardBounds; + } + @Override public boolean isRewriteable(Object parent, int subAggLength) { if (parent == null && subAggLength == 0 && !missing && !hasScript) { - return fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType; + if (fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType) { + return fieldType.isSearchable(); + } } return false; } - public DateFieldMapper.DateFieldType getFieldType() { - assert fieldType instanceof DateFieldMapper.DateFieldType; - return (DateFieldMapper.DateFieldType) fieldType; + @Override + public Weight[] buildFastFilter(SearchContext context) throws IOException { + long[] bounds = getDateHistoAggBounds(context, fieldType.name()); + logger.debug("Bounds are {} for shard {}", bounds, context.indexShard().shardId()); + return buildFastFilter(context, bounds); } - } - /** - * For composite aggregation with date histogram as a source - */ - public static class CompositeAggregationType extends DateHistogramAggregationType { - private final RoundingValuesSource valuesSource; - private long afterKey = -1L; - private final int size; - - public CompositeAggregationType( - CompositeValuesSourceConfig[] sourceConfigs, - CompositeKey rawAfterKey, - List formats, - int size - ) { - super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript()); - this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource(); - this.size = size; - if (rawAfterKey != null) { - assert rawAfterKey.size() == 1 && formats.size() == 1; - this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> { - throw new IllegalArgumentException("now() is not supported in [after] key"); - }); + @Override + public Weight[] buildFastFilter(LeafReaderContext leaf, SearchContext context) throws IOException { + long[] bounds = getSegmentBounds(leaf, fieldType.name()); + logger.debug("Bounds are {} for shard {} segment {}", bounds, context.indexShard().shardId(), leaf.ord); + return buildFastFilter(context, bounds); + } + + private Weight[] buildFastFilter(SearchContext context, long[] bounds) throws IOException { + bounds = processHardBounds(bounds); + logger.debug("Bounds are {} for shard {} with hard bound", bounds, context.indexShard().shardId()); + if (bounds == null) { + return null; + } + assert bounds[0] <= bounds[1] : "Low bound should be less than high bound"; + + final Rounding rounding = getRounding(bounds[0], bounds[1]); + final OptionalLong intervalOpt = Rounding.getInterval(rounding); + if (intervalOpt.isEmpty()) { + return null; } + final long interval = intervalOpt.getAsLong(); + + // process the after key of composite agg + processAfterKey(bounds, interval); + + return FastFilterRewriteHelper.createFilterForAggregations( + context, + (DateFieldMapper.DateFieldType) fieldType, + interval, + getRoundingPrepared(), + bounds[0], + bounds[1] + ); } - public Rounding getRounding() { - return valuesSource.getRounding(); + protected abstract Rounding getRounding(final long low, final long high); + + protected abstract Rounding.Prepared getRoundingPrepared(); + + protected void processAfterKey(long[] bound, long interval) {} + + protected long[] processHardBounds(long[] bounds) { + if (bounds != null) { + // Update min/max limit if user specified any hard bounds + if (hardBounds != null) { + if (hardBounds.getMin() > bounds[0]) { + bounds[0] = hardBounds.getMin(); + } + if (hardBounds.getMax() - 1 < bounds[1]) { + bounds[1] = hardBounds.getMax() - 1; // hard bounds max is exclusive + } + if (bounds[0] > bounds[1]) { + return null; + } + } + } + return bounds; } - public Rounding.Prepared getRoundingPreparer() { - return valuesSource.getPreparedRounding(); + public DateFieldMapper.DateFieldType getFieldType() { + assert fieldType instanceof DateFieldMapper.DateFieldType; + return (DateFieldMapper.DateFieldType) fieldType; } } @@ -335,7 +409,9 @@ public static long getBucketOrd(long bucketOrd) { } /** - * This is executed for each segment by passing the leaf reader context + * Try to get the bucket doc counts from the fast filters for the aggregation + *

+ * Usage: invoked at segment level — in getLeafCollector of aggregator * * @param incrementDocCount takes in the bucket key value and the bucket count */ @@ -345,9 +421,39 @@ public static boolean tryFastFilterAggregation( final BiConsumer incrementDocCount ) throws IOException { if (fastFilterContext == null) return false; - if (fastFilterContext.filters == null) return false; + if (!fastFilterContext.rewriteable) { + return false; + } + + NumericDocValues docCountValues = DocValues.getNumeric(ctx.reader(), DocCountFieldMapper.NAME); + if (docCountValues.nextDoc() != NO_MORE_DOCS) { + logger.debug( + "Shard {} segment {} has at least one document with _doc_count field, skip fast filter optimization", + fastFilterContext.context.indexShard().shardId(), + ctx.ord + ); + return false; + } + + // if no filters built at shard level (see getDateHistoAggBounds method for possible reasons) + // check if the query is functionally match-all at segment level + if (!fastFilterContext.filtersBuiltAtShardLevel && !segmentMatchAll(fastFilterContext.context, ctx)) { + return false; + } + Weight[] filters = fastFilterContext.filters; + if (filters == null) { + logger.debug( + "Shard {} segment {} functionally match all documents. Build the fast filter", + fastFilterContext.context.indexShard().shardId(), + ctx.ord + ); + fastFilterContext.buildFastFilter(ctx); + filters = fastFilterContext.filters; + if (filters == null) { + return false; + } + } - final Weight[] filters = fastFilterContext.filters; final int[] counts = new int[filters.length]; int i; for (i = 0; i < filters.length; i++) { @@ -360,26 +466,34 @@ public static boolean tryFastFilterAggregation( } int s = 0; - int size = Integer.MAX_VALUE; + int size = fastFilterContext.aggregationType.getSize(); for (i = 0; i < filters.length; i++) { if (counts[i] > 0) { long bucketKey = i; // the index of filters is the key for filters aggregation - if (fastFilterContext.aggregationType instanceof DateHistogramAggregationType) { - final DateFieldMapper.DateFieldType fieldType = ((DateHistogramAggregationType) fastFilterContext.aggregationType) - .getFieldType(); + if (fastFilterContext.aggregationType instanceof AbstractDateHistogramAggregationType) { + final DateFieldMapper.DateFieldType fieldType = + ((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType).getFieldType(); bucketKey = fieldType.convertNanosToMillis( NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0) ); - if (fastFilterContext.aggregationType instanceof CompositeAggregationType) { - size = ((CompositeAggregationType) fastFilterContext.aggregationType).size; - } } incrementDocCount.accept(bucketKey, counts[i]); s++; - if (s > size) return true; + if (s > size) { + logger.debug("Fast filter optimization applied to composite aggregation with size {}", size); + return true; + } } } + logger.debug("Fast filter optimization applied"); return true; } + + private static boolean segmentMatchAll(SearchContext ctx, LeafReaderContext leafCtx) throws IOException { + Weight weight = ctx.searcher().createWeight(ctx.query(), ScoreMode.COMPLETE_NO_SCORES, 1f); + assert weight != null; + int count = weight.count(leafCtx); + return count > 0 && count == leafCtx.reader().numDocs(); + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index 822b8a6c4b118..b97c814cdf645 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -164,24 +164,55 @@ final class CompositeAggregator extends BucketsAggregator { this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey); this.rawAfterKey = rawAfterKey; - fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(); + fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context); if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) return; - fastFilterContext.setAggregationType( - new FastFilterRewriteHelper.CompositeAggregationType(sourceConfigs, rawAfterKey, formats, size) - ); + fastFilterContext.setAggregationType(new CompositeAggregationType()); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - // bucketOrds is the data structure for saving date histogram results + // bucketOrds is used for saving date histogram results bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE); - // Currently the filter rewrite is only supported for date histograms - FastFilterRewriteHelper.CompositeAggregationType aggregationType = - (FastFilterRewriteHelper.CompositeAggregationType) fastFilterContext.aggregationType; - preparedRounding = aggregationType.getRoundingPreparer(); - fastFilterContext.buildFastFilter( - context, - fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()), - x -> aggregationType.getRounding(), - () -> preparedRounding - ); + preparedRounding = ((CompositeAggregationType) fastFilterContext.getAggregationType()).getRoundingPrepared(); + fastFilterContext.buildFastFilter(); + } + } + + /** + * Currently the filter rewrite is only supported for date histograms + */ + private class CompositeAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType { + private final RoundingValuesSource valuesSource; + private long afterKey = -1L; + + public CompositeAggregationType() { + super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript()); + this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource(); + if (rawAfterKey != null) { + assert rawAfterKey.size() == 1 && formats.size() == 1; + this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> { + throw new IllegalArgumentException("now() is not supported in [after] key"); + }); + } + } + + public Rounding getRounding(final long low, final long high) { + return valuesSource.getRounding(); + } + + public Rounding.Prepared getRoundingPrepared() { + return valuesSource.getPreparedRounding(); + } + + @Override + protected void processAfterKey(long[] bound, long interval) { + // afterKey is the last bucket key in previous response, and the bucket key + // is the minimum of all values in the bucket, so need to add the interval + if (afterKey != -1L) { + bound[0] = afterKey + interval; + } + } + + @Override + public int getSize() { + return size; } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 0ea820abbedf4..12aefc540e75c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -42,6 +42,7 @@ import org.opensearch.common.util.IntArray; import org.opensearch.common.util.LongArray; import org.opensearch.core.common.util.ByteArray; +import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; @@ -156,45 +157,53 @@ private AutoDateHistogramAggregator( this.roundingPreparer = roundingPreparer; this.preparedRounding = prepareRounding(0); - fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(); + fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context); fastFilterContext.setAggregationType( - new FastFilterRewriteHelper.DateHistogramAggregationType( + new AutoHistogramAggregationType( valuesSourceConfig.fieldType(), valuesSourceConfig.missing() != null, valuesSourceConfig.script() != null ) ); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - fastFilterContext.buildFastFilter( - context, - fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()), - b -> getMinimumRounding(b[0], b[1]), - // Passing prepared rounding as supplier to ensure the correct prepared - // rounding is set as it is done during getMinimumRounding - () -> preparedRounding - ); + fastFilterContext.buildFastFilter(); } } - private Rounding getMinimumRounding(final long low, final long high) { - // max - min / targetBuckets = bestDuration - // find the right innerInterval this bestDuration belongs to - // since we cannot exceed targetBuckets, bestDuration should go up, - // so the right innerInterval should be an upper bound - long bestDuration = (high - low) / targetBuckets; - while (roundingIdx < roundingInfos.length - 1) { - final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx]; - final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1]; - // If the interval duration is covered by the maximum inner interval, - // we can start with this outer interval for creating the buckets - if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) { - break; + private class AutoHistogramAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType { + + public AutoHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) { + super(fieldType, missing, hasScript); + } + + @Override + protected Rounding getRounding(final long low, final long high) { + // max - min / targetBuckets = bestDuration + // find the right innerInterval this bestDuration belongs to + // since we cannot exceed targetBuckets, bestDuration should go up, + // so the right innerInterval should be an upper bound + long bestDuration = (high - low) / targetBuckets; + // reset so this function is idempotent + roundingIdx = 0; + while (roundingIdx < roundingInfos.length - 1) { + final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx]; + final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1]; + // If the interval duration is covered by the maximum inner interval, + // we can start with this outer interval for creating the buckets + if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) { + break; + } + roundingIdx++; } - roundingIdx++; + + preparedRounding = prepareRounding(roundingIdx); + return roundingInfos[roundingIdx].rounding; } - preparedRounding = prepareRounding(roundingIdx); - return roundingInfos[roundingIdx].rounding; + @Override + protected Prepared getRoundingPrepared() { + return preparedRounding; + } } protected abstract LongKeyedBucketOrds getBucketOrds(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index b95bd093b82a6..0e830106c8284 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -39,6 +39,7 @@ import org.opensearch.common.Nullable; import org.opensearch.common.Rounding; import org.opensearch.common.lease.Releasables; +import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; @@ -115,29 +116,35 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality); - fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(); + fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context); fastFilterContext.setAggregationType( - new FastFilterRewriteHelper.DateHistogramAggregationType( + new DateHistogramAggregationType( valuesSourceConfig.fieldType(), valuesSourceConfig.missing() != null, - valuesSourceConfig.script() != null + valuesSourceConfig.script() != null, + hardBounds ) ); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - fastFilterContext.buildFastFilter(context, this::computeBounds, x -> rounding, () -> preparedRounding); + fastFilterContext.buildFastFilter(); } } - private long[] computeBounds(final FastFilterRewriteHelper.DateHistogramAggregationType fieldContext) throws IOException { - final long[] bounds = FastFilterRewriteHelper.getAggregationBounds(context, fieldContext.getFieldType().name()); - if (bounds != null) { - // Update min/max limit if user specified any hard bounds - if (hardBounds != null) { - bounds[0] = Math.max(bounds[0], hardBounds.getMin()); - bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive - } + private class DateHistogramAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType { + + public DateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript, LongBounds hardBounds) { + super(fieldType, missing, hasScript, hardBounds); + } + + @Override + protected Rounding getRounding(long low, long high) { + return rounding; + } + + @Override + protected Rounding.Prepared getRoundingPrepared() { + return preparedRounding; } - return bounds; } @Override diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java index bbe27eb573b64..13a3d8145743b 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java @@ -35,6 +35,7 @@ import org.apache.lucene.document.LongPoint; import org.apache.lucene.index.Term; import org.apache.lucene.search.DocValuesFieldExistsQuery; +import org.apache.lucene.search.FieldExistsQuery; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.TermQuery; import org.opensearch.OpenSearchParseException; @@ -1256,6 +1257,74 @@ public void testMultiValuedWithKeywordLongAndDouble() throws Exception { ); } + public void testDateHistogramSourceWithSize() throws IOException { + final List>> dataset = new ArrayList<>( + Arrays.asList( + createDocument("date", asLong("2017-10-20T03:08:45")), + createDocument("date", asLong("2016-09-20T09:00:34")), + createDocument("date", asLong("2016-09-20T11:34:00")), + createDocument("date", asLong("2017-10-20T06:09:24")), + createDocument("date", asLong("2017-10-19T06:09:24")), + createDocument("long", 4L) + ) + ); + testSearchCase( + Arrays.asList( + new MatchAllDocsQuery(), + new FieldExistsQuery("date"), + LongPoint.newRangeQuery("date", asLong("2016-09-20T09:00:34"), asLong("2017-10-20T06:09:24")) + ), + dataset, + () -> { + DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date").field("date") + .calendarInterval(DateHistogramInterval.days(1)); + return new CompositeAggregationBuilder("name", Collections.singletonList(histo)).size(1); + }, + (result) -> { + assertEquals(1, result.getBuckets().size()); + assertEquals("{date=1474329600000}", result.afterKey().toString()); // 2017-10-20T00:00:00 + assertEquals("{date=1474329600000}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(2L, result.getBuckets().get(0).getDocCount()); + } + ); + } + + public void testDateHistogramSourceWithDocCountField() throws IOException { + final List>> dataset = new ArrayList<>( + Arrays.asList( + createDocument("date", asLong("2017-10-20T03:08:45"), "_doc_count", 5), + createDocument("date", asLong("2016-09-20T09:00:34")), + createDocument("date", asLong("2016-09-20T11:34:00"), "_doc_count", 2), + createDocument("date", asLong("2017-10-20T06:09:24")), + createDocument("date", asLong("2017-10-19T06:09:24"), "_doc_count", 3), + createDocument("long", 4L) + ) + ); + testSearchCase( + Arrays.asList( + new MatchAllDocsQuery(), + new FieldExistsQuery("date"), + LongPoint.newRangeQuery("date", asLong("2016-09-20T09:00:34"), asLong("2017-10-20T06:09:24")) + ), + dataset, + () -> { + DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date").field("date") + .calendarInterval(DateHistogramInterval.days(1)); + return new CompositeAggregationBuilder("name", Collections.singletonList(histo)); + }, + (result) -> { + assertEquals(3, result.getBuckets().size()); + assertEquals("{date=1508457600000}", result.afterKey().toString()); + assertEquals("{date=1474329600000}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(3L, result.getBuckets().get(0).getDocCount()); + assertEquals("{date=1508371200000}", result.getBuckets().get(1).getKeyAsString()); + assertEquals(3L, result.getBuckets().get(1).getDocCount()); + assertEquals("{date=1508457600000}", result.getBuckets().get(2).getKeyAsString()); + assertEquals(6L, result.getBuckets().get(2).getDocCount()); + } + ); + } + public void testWithDateHistogram() throws IOException { final List>> dataset = new ArrayList<>(); dataset.addAll( diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index bca6623e66104..2a4fbca7a8541 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -34,6 +34,7 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; @@ -45,6 +46,7 @@ import org.apache.lucene.tests.index.RandomIndexWriter; import org.opensearch.common.time.DateFormatters; import org.opensearch.index.mapper.DateFieldMapper; +import org.opensearch.index.mapper.DocCountFieldMapper; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.BucketOrder; import org.opensearch.search.aggregations.bucket.terms.StringTerms; @@ -1178,6 +1180,181 @@ public void testOverlappingBounds() { ); } + public void testHardBoundsNotOverlapping() throws IOException { + testSearchCase( + new MatchAllDocsQuery(), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) + .hardBounds(new LongBounds("2018-01-01", "2020-01-01")) + .field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(0, buckets.size()); + }, + false + ); + + testSearchCase( + new MatchAllDocsQuery(), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) + .hardBounds(new LongBounds("2016-01-01", "2017-01-01")) + .field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(0, buckets.size()); + }, + false + ); + + testSearchCase( + new MatchAllDocsQuery(), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) + .hardBounds(new LongBounds("2016-01-01", "2017-02-03")) + .field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(2, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-02T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + }, + false + ); + + testSearchCase( + new MatchAllDocsQuery(), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY) + .hardBounds(new LongBounds("2017-02-03", "2020-01-01")) + .field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(3, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-03T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-04T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-05T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + }, + false + ); + } + + public void testFilterRewriteOptimizationWithRangeQuery() throws IOException { + testSearchCase( + LongPoint.newRangeQuery(AGGREGABLE_DATE, asLong("2018-01-01"), asLong("2020-01-01")), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(0, buckets.size()); + }, + 10000, + false, + false, + true // force AGGREGABLE_DATE field to be searchable to test the filter rewrite optimization path + ); + + testSearchCase( + LongPoint.newRangeQuery(AGGREGABLE_DATE, asLong("2016-01-01"), asLong("2017-01-01")), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(0, buckets.size()); + }, + 10000, + false, + false, + true + ); + + testSearchCase( + LongPoint.newRangeQuery(AGGREGABLE_DATE, asLong("2016-01-01"), asLong("2017-02-02")), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(2, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-02T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + }, + 10000, + false, + false, + true + ); + + testSearchCase( + LongPoint.newRangeQuery(AGGREGABLE_DATE, asLong("2017-02-03"), asLong("2020-01-01")), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(3, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-03T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(3, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-04T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(0, bucket.getDocCount()); + + bucket = buckets.get(2); + assertEquals("2017-02-05T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(1, bucket.getDocCount()); + }, + 10000, + false, + false, + true + ); + } + + public void testDocCountField() throws IOException { + testSearchCase( + new MatchAllDocsQuery(), + Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02"), + aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE), + histogram -> { + List buckets = histogram.getBuckets(); + assertEquals(2, buckets.size()); + + Histogram.Bucket bucket = buckets.get(0); + assertEquals("2017-02-01T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(5, bucket.getDocCount()); + + bucket = buckets.get(1); + assertEquals("2017-02-02T00:00:00.000Z", bucket.getKeyAsString()); + assertEquals(2, bucket.getDocCount()); + }, + 10000, + false, + true + ); + } + public void testIllegalInterval() throws IOException { IllegalArgumentException e = expectThrows( IllegalArgumentException.class, @@ -1211,13 +1388,42 @@ private void testSearchCase( int maxBucket, boolean useNanosecondResolution ) throws IOException { - boolean aggregableDateIsSearchable = randomBoolean(); + testSearchCase(query, dataset, configure, verify, maxBucket, useNanosecondResolution, false); + } + + private void testSearchCase( + Query query, + List dataset, + Consumer configure, + Consumer verify, + int maxBucket, + boolean useNanosecondResolution, + boolean useDocCountField + ) throws IOException { + testSearchCase(query, dataset, configure, verify, maxBucket, useNanosecondResolution, useDocCountField, randomBoolean()); + } + + private void testSearchCase( + Query query, + List dataset, + Consumer configure, + Consumer verify, + int maxBucket, + boolean useNanosecondResolution, + boolean useDocCountField, + boolean aggregableDateIsSearchable + ) throws IOException { + logger.debug("Aggregable date is searchable {}", aggregableDateIsSearchable); DateFieldMapper.DateFieldType fieldType = aggregableDateFieldType(useNanosecondResolution, aggregableDateIsSearchable); try (Directory directory = newDirectory()) { try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { Document document = new Document(); + if (useDocCountField) { + // add the doc count field to the first document + document.add(new NumericDocValuesField(DocCountFieldMapper.NAME, 5)); + } for (String date : dataset) { long instant = asLong(date, fieldType); document.add(new SortedNumericDocValuesField(AGGREGABLE_DATE, instant)); diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/composite/BaseCompositeAggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/composite/BaseCompositeAggregatorTestCase.java index 16abf2e255b5d..466e4d1bf1742 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/composite/BaseCompositeAggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/composite/BaseCompositeAggregatorTestCase.java @@ -14,6 +14,7 @@ import org.apache.lucene.document.InetAddressPoint; import org.apache.lucene.document.IntPoint; import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.document.StringField; @@ -40,6 +41,7 @@ import org.opensearch.core.index.Index; import org.opensearch.index.IndexSettings; import org.opensearch.index.mapper.DateFieldMapper; +import org.opensearch.index.mapper.DocCountFieldMapper; import org.opensearch.index.mapper.DocumentMapper; import org.opensearch.index.mapper.IpFieldMapper; import org.opensearch.index.mapper.KeywordFieldMapper; @@ -204,6 +206,12 @@ protected void addToDocument(int id, Document doc, Map> key doc.add(new StringField("id", Integer.toString(id), Field.Store.NO)); for (Map.Entry> entry : keys.entrySet()) { final String name = entry.getKey(); + if (name.equals(DocCountFieldMapper.NAME)) { + doc.add(new IntPoint(name, (int) entry.getValue().get(0))); + // doc count field should be DocValuesType.NUMERIC + doc.add(new NumericDocValuesField(name, (int) entry.getValue().get(0))); + continue; + } for (Object value : entry.getValue()) { if (value instanceof Integer) { doc.add(new SortedNumericDocValuesField(name, (int) value));