Skip to content

Commit

Permalink
Apply fast date histogram optimization at the segment level (#12073)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
  • Loading branch information
bowenlan-amzn authored Feb 7, 2024
1 parent c0fca74 commit 9a0a69f
Show file tree
Hide file tree
Showing 8 changed files with 624 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ public void setupSuiteScopeCluster() throws Exception {
indexDoc(2, 15, 3), // date: Feb 15, dates: Feb 15, Mar 16
indexDoc(3, 2, 4), // date: Mar 2, dates: Mar 2, Apr 3
indexDoc(3, 15, 5), // date: Mar 15, dates: Mar 15, Apr 16
indexDoc(3, 23, 6)
indexDoc(3, 23, 6) // date: Mar 23, dates: Mar 23, Apr 24
)
); // date: Mar 23, dates: Mar 23, Apr 24
);
indexRandom(true, builders);
ensureSearchable();
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -164,24 +164,55 @@ final class CompositeAggregator extends BucketsAggregator {
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.rawAfterKey = rawAfterKey;

fastFilterContext = new FastFilterRewriteHelper.FastFilterContext();
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context);
if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) return;
fastFilterContext.setAggregationType(
new FastFilterRewriteHelper.CompositeAggregationType(sourceConfigs, rawAfterKey, formats, size)
);
fastFilterContext.setAggregationType(new CompositeAggregationType());
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
// bucketOrds is the data structure for saving date histogram results
// bucketOrds is used for saving date histogram results
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
// Currently the filter rewrite is only supported for date histograms
FastFilterRewriteHelper.CompositeAggregationType aggregationType =
(FastFilterRewriteHelper.CompositeAggregationType) fastFilterContext.aggregationType;
preparedRounding = aggregationType.getRoundingPreparer();
fastFilterContext.buildFastFilter(
context,
fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()),
x -> aggregationType.getRounding(),
() -> preparedRounding
);
preparedRounding = ((CompositeAggregationType) fastFilterContext.getAggregationType()).getRoundingPrepared();
fastFilterContext.buildFastFilter();
}
}

/**
* Currently the filter rewrite is only supported for date histograms
*/
private class CompositeAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType {
private final RoundingValuesSource valuesSource;
private long afterKey = -1L;

public CompositeAggregationType() {
super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript());
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
if (rawAfterKey != null) {
assert rawAfterKey.size() == 1 && formats.size() == 1;
this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");
});
}
}

public Rounding getRounding(final long low, final long high) {
return valuesSource.getRounding();
}

public Rounding.Prepared getRoundingPrepared() {
return valuesSource.getPreparedRounding();
}

@Override
protected void processAfterKey(long[] bound, long interval) {
// afterKey is the last bucket key in previous response, and the bucket key
// is the minimum of all values in the bucket, so need to add the interval
if (afterKey != -1L) {
bound[0] = afterKey + interval;
}
}

@Override
public int getSize() {
return size;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.common.util.IntArray;
import org.opensearch.common.util.LongArray;
import org.opensearch.core.common.util.ByteArray;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -156,45 +157,53 @@ private AutoDateHistogramAggregator(
this.roundingPreparer = roundingPreparer;
this.preparedRounding = prepareRounding(0);

fastFilterContext = new FastFilterRewriteHelper.FastFilterContext();
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context);
fastFilterContext.setAggregationType(
new FastFilterRewriteHelper.DateHistogramAggregationType(
new AutoHistogramAggregationType(
valuesSourceConfig.fieldType(),
valuesSourceConfig.missing() != null,
valuesSourceConfig.script() != null
)
);
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
fastFilterContext.buildFastFilter(
context,
fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()),
b -> getMinimumRounding(b[0], b[1]),
// Passing prepared rounding as supplier to ensure the correct prepared
// rounding is set as it is done during getMinimumRounding
() -> preparedRounding
);
fastFilterContext.buildFastFilter();
}
}

private Rounding getMinimumRounding(final long low, final long high) {
// max - min / targetBuckets = bestDuration
// find the right innerInterval this bestDuration belongs to
// since we cannot exceed targetBuckets, bestDuration should go up,
// so the right innerInterval should be an upper bound
long bestDuration = (high - low) / targetBuckets;
while (roundingIdx < roundingInfos.length - 1) {
final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx];
final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1];
// If the interval duration is covered by the maximum inner interval,
// we can start with this outer interval for creating the buckets
if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) {
break;
private class AutoHistogramAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType {

public AutoHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) {
super(fieldType, missing, hasScript);
}

@Override
protected Rounding getRounding(final long low, final long high) {
// max - min / targetBuckets = bestDuration
// find the right innerInterval this bestDuration belongs to
// since we cannot exceed targetBuckets, bestDuration should go up,
// so the right innerInterval should be an upper bound
long bestDuration = (high - low) / targetBuckets;
// reset so this function is idempotent
roundingIdx = 0;
while (roundingIdx < roundingInfos.length - 1) {
final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx];
final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1];
// If the interval duration is covered by the maximum inner interval,
// we can start with this outer interval for creating the buckets
if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) {
break;
}
roundingIdx++;
}
roundingIdx++;

preparedRounding = prepareRounding(roundingIdx);
return roundingInfos[roundingIdx].rounding;
}

preparedRounding = prepareRounding(roundingIdx);
return roundingInfos[roundingIdx].rounding;
@Override
protected Prepared getRoundingPrepared() {
return preparedRounding;
}
}

protected abstract LongKeyedBucketOrds getBucketOrds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.Rounding;
import org.opensearch.common.lease.Releasables;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -115,29 +116,35 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg

bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality);

fastFilterContext = new FastFilterRewriteHelper.FastFilterContext();
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context);
fastFilterContext.setAggregationType(
new FastFilterRewriteHelper.DateHistogramAggregationType(
new DateHistogramAggregationType(
valuesSourceConfig.fieldType(),
valuesSourceConfig.missing() != null,
valuesSourceConfig.script() != null
valuesSourceConfig.script() != null,
hardBounds
)
);
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
fastFilterContext.buildFastFilter(context, this::computeBounds, x -> rounding, () -> preparedRounding);
fastFilterContext.buildFastFilter();
}
}

private long[] computeBounds(final FastFilterRewriteHelper.DateHistogramAggregationType fieldContext) throws IOException {
final long[] bounds = FastFilterRewriteHelper.getAggregationBounds(context, fieldContext.getFieldType().name());
if (bounds != null) {
// Update min/max limit if user specified any hard bounds
if (hardBounds != null) {
bounds[0] = Math.max(bounds[0], hardBounds.getMin());
bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive
}
private class DateHistogramAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType {

public DateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript, LongBounds hardBounds) {
super(fieldType, missing, hasScript, hardBounds);
}

@Override
protected Rounding getRounding(long low, long high) {
return rounding;
}

@Override
protected Rounding.Prepared getRoundingPrepared() {
return preparedRounding;
}
return bounds;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.FieldExistsQuery;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TermQuery;
import org.opensearch.OpenSearchParseException;
Expand Down Expand Up @@ -1256,6 +1257,74 @@ public void testMultiValuedWithKeywordLongAndDouble() throws Exception {
);
}

public void testDateHistogramSourceWithSize() throws IOException {
final List<Map<String, List<Object>>> dataset = new ArrayList<>(
Arrays.asList(
createDocument("date", asLong("2017-10-20T03:08:45")),
createDocument("date", asLong("2016-09-20T09:00:34")),
createDocument("date", asLong("2016-09-20T11:34:00")),
createDocument("date", asLong("2017-10-20T06:09:24")),
createDocument("date", asLong("2017-10-19T06:09:24")),
createDocument("long", 4L)
)
);
testSearchCase(
Arrays.asList(
new MatchAllDocsQuery(),
new FieldExistsQuery("date"),
LongPoint.newRangeQuery("date", asLong("2016-09-20T09:00:34"), asLong("2017-10-20T06:09:24"))
),
dataset,
() -> {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date").field("date")
.calendarInterval(DateHistogramInterval.days(1));
return new CompositeAggregationBuilder("name", Collections.singletonList(histo)).size(1);
},
(result) -> {
assertEquals(1, result.getBuckets().size());
assertEquals("{date=1474329600000}", result.afterKey().toString()); // 2017-10-20T00:00:00
assertEquals("{date=1474329600000}", result.getBuckets().get(0).getKeyAsString());
assertEquals(2L, result.getBuckets().get(0).getDocCount());
}
);
}

public void testDateHistogramSourceWithDocCountField() throws IOException {
final List<Map<String, List<Object>>> dataset = new ArrayList<>(
Arrays.asList(
createDocument("date", asLong("2017-10-20T03:08:45"), "_doc_count", 5),
createDocument("date", asLong("2016-09-20T09:00:34")),
createDocument("date", asLong("2016-09-20T11:34:00"), "_doc_count", 2),
createDocument("date", asLong("2017-10-20T06:09:24")),
createDocument("date", asLong("2017-10-19T06:09:24"), "_doc_count", 3),
createDocument("long", 4L)
)
);
testSearchCase(
Arrays.asList(
new MatchAllDocsQuery(),
new FieldExistsQuery("date"),
LongPoint.newRangeQuery("date", asLong("2016-09-20T09:00:34"), asLong("2017-10-20T06:09:24"))
),
dataset,
() -> {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date").field("date")
.calendarInterval(DateHistogramInterval.days(1));
return new CompositeAggregationBuilder("name", Collections.singletonList(histo));
},
(result) -> {
assertEquals(3, result.getBuckets().size());
assertEquals("{date=1508457600000}", result.afterKey().toString());
assertEquals("{date=1474329600000}", result.getBuckets().get(0).getKeyAsString());
assertEquals(3L, result.getBuckets().get(0).getDocCount());
assertEquals("{date=1508371200000}", result.getBuckets().get(1).getKeyAsString());
assertEquals(3L, result.getBuckets().get(1).getDocCount());
assertEquals("{date=1508457600000}", result.getBuckets().get(2).getKeyAsString());
assertEquals(6L, result.getBuckets().get(2).getDocCount());
}
);
}

public void testWithDateHistogram() throws IOException {
final List<Map<String, List<Object>>> dataset = new ArrayList<>();
dataset.addAll(
Expand Down
Loading

0 comments on commit 9a0a69f

Please sign in to comment.