Skip to content

Commit

Permalink
Fixing unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
  • Loading branch information
bowenlan-amzn committed Dec 7, 2023
1 parent d1674eb commit 2dc0271
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,21 @@ private static Weight[] createFilterForAggregations(
final Rounding.Prepared preparedRounding,
final String field,
final DateFieldMapper.DateFieldType fieldType,
final long low,
final long high
long low,
final long high,
long afterKey
) throws IOException {
final OptionalLong intervalOpt = Rounding.getInterval(rounding);
if (intervalOpt.isEmpty()) {
return null;
}

final long interval = intervalOpt.getAsLong();
// afterKey is the last bucket key in previous response, while the bucket key
// is the start of the bucket values, so add the interval
if (afterKey != 0) {
low = afterKey + interval;
}
// Calculate the number of buckets using range and interval
long roundedLow = preparedRounding.round(fieldType.convertNanosToMillis(low));
long prevRounded = roundedLow;
Expand Down Expand Up @@ -224,7 +230,8 @@ public static FilterContext buildFastFilterContext(
fieldName,
(DateFieldMapper.DateFieldType) fieldType,
bounds[0],
bounds[1]
bounds[1],
valueSourceContext.afterKey
);
return new FilterContext((DateFieldMapper.DateFieldType) fieldType, filters);
}
Expand All @@ -240,16 +247,19 @@ public static class ValueSourceContext {
private final boolean missing;
private final boolean hasScript;
private final MappedFieldType fieldType;
private final long afterKey;

/**
* @param missing whether missing value/bucket is set
* @param hasScript whether script is used
* @param fieldType null if the field doesn't exist
* @param afterKey
*/
public ValueSourceContext(boolean missing, boolean hasScript, MappedFieldType fieldType) {
public ValueSourceContext(boolean missing, boolean hasScript, MappedFieldType fieldType, long afterKey) {
this.missing = missing;
this.hasScript = hasScript;
this.fieldType = fieldType;
this.afterKey = afterKey;
}

public MappedFieldType getFieldType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ final class CompositeAggregator extends BucketsAggregator {

private boolean earlyTerminated;

private final Weight[] filters;
private final LongKeyedBucketOrds bucketOrds;
private final DateFieldMapper.DateFieldType fieldType;
private final Rounding.Prepared preparedRounding;
private Weight[] filters = null;
private LongKeyedBucketOrds bucketOrds = null;
private DateFieldMapper.DateFieldType fieldType = null;
private Rounding.Prepared preparedRounding = null;

CompositeAggregator(
String name,
Expand Down Expand Up @@ -165,51 +165,48 @@ final class CompositeAggregator extends BucketsAggregator {
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.rawAfterKey = rawAfterKey;

bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);

CompositeValuesSourceConfig dateHistogramSourceConfig = sourceConfigs[0];
RoundingValuesSource dateHistogramSource = (RoundingValuesSource) dateHistogramSourceConfig.valuesSource();
preparedRounding = dateHistogramSource.getPreparedRounding();
FilterRewriteHelper.ValueSourceContext dateHistogramSourceContext = new FilterRewriteHelper.ValueSourceContext(
dateHistogramSourceConfig.missingBucket(),
dateHistogramSourceConfig.hasScript(),
dateHistogramSourceConfig.fieldType()
);
FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext(
parent,
subAggregators.length,
context,
x -> dateHistogramSource.getRounding(),
() -> preparedRounding,
dateHistogramSourceContext,
// TODO reading need to consider afterKey in this
fc -> FilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name())
);
if (filterContext != null) {
fieldType = filterContext.fieldType;
filters = filterContext.filters;
} else {
filters = null;
fieldType = null;
// Try fast filter optimization
if (sourceConfigs.length == 1 && sourceConfigs[0].valuesSource() instanceof RoundingValuesSource) {
RoundingValuesSource dateHistogramSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
preparedRounding = dateHistogramSource.getPreparedRounding();
long afterValue = 0;
if (rawAfterKey != null) {
assert rawAfterKey.size() == 1 && formats.size() == 1;
afterValue = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");
});
}
FilterRewriteHelper.ValueSourceContext dateHistogramSourceContext = new FilterRewriteHelper.ValueSourceContext(
sourceConfigs[0].missingBucket(),
sourceConfigs[0].hasScript(),
sourceConfigs[0].fieldType(),
afterValue
);
FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext(
parent,
subAggregators.length,
context,
x -> dateHistogramSource.getRounding(),
() -> preparedRounding,
dateHistogramSourceContext,
fc -> FilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name())
);
if (filterContext != null) {
fieldType = filterContext.fieldType;
filters = filterContext.filters;
} else {
filters = null;
fieldType = null;
}
}
}

// private long[] computeBounds(final FilterRewriteHelper.ValueSourceContext fieldContext) {
// final long[] bounds = FilterRewriteHelper.getAggregationBounds(context, fieldContext.getFieldType().name());
// if (bounds != null) {
// // Update min/max limit if user specified any hard bounds
// if (hardBounds != null) {
// bounds[0] = Math.max(bounds[0], hardBounds.getMin());
// bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive
// }
// }
// return bounds;
// }

@Override
protected void doClose() {
try {
Releasables.close(queue);
Releasables.close(bucketOrds);
} finally {
Releasables.close(sources);
}
Expand Down Expand Up @@ -261,8 +258,8 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
);
}

// For the fast filters optimization
if (bucketOrds.size() != 0) {
// Fast filters optimization
if (bucketOrds != null) {
Map<CompositeKey, InternalComposite.InternalBucket> bucketMap = new HashMap<>();
for (InternalComposite.InternalBucket internalBucket : buckets) {
bucketMap.put(internalBucket.getRawKey(), internalBucket);
Expand Down Expand Up @@ -549,22 +546,22 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket

finishLeaf();

boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR; // TODO reading subAggs are deferred
boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR;

Sort indexSortPrefix = buildIndexSortPrefix(ctx);
int sortPrefixLen = computeSortPrefixLen(indexSortPrefix); // TODO reading asc index sort exists
int sortPrefixLen = computeSortPrefixLen(indexSortPrefix);

// are there index sort enabled? sortPrefixLen
SortedDocsProducer sortedDocsProducer = sortPrefixLen == 0
? sources[0].createSortedDocsProducerOrNull(ctx.reader(), context.query()) // TODO reading only using the first field
? sources[0].createSortedDocsProducerOrNull(ctx.reader(), context.query())
: null;
if (sortedDocsProducer != null) {
// Visit documents sorted by the leading source of the composite definition and terminates
// when the leading source value is guaranteed to be greater than the lowest composite bucket
// in the queue.
DocIdSet docIdSet = sortedDocsProducer.processLeaf(context.query(), queue, ctx, fillDocIdSet);
if (fillDocIdSet) {
entries.add(new Entry(ctx, docIdSet)); // TODO reading add entries
entries.add(new Entry(ctx, docIdSet));
}
// We can bypass search entirely for this segment, the processing is done in the previous call.
// Throwing this exception will terminate the execution of the search for this root aggregation,
Expand Down Expand Up @@ -635,7 +632,7 @@ private void runDeferredCollections() throws IOException {

deferredCollectors.preCollection();

for (Entry entry : entries) { // TODO reading entry is the leaf
for (Entry entry : entries) {
DocIdSetIterator docIdSetIterator = entry.docIdSet.iterator();
if (docIdSetIterator == null) {
continue;
Expand Down Expand Up @@ -680,7 +677,7 @@ public void collect(int doc, long zeroBucket) throws IOException {
if (slot != null) {
// The candidate key is a top bucket.
// We can defer the collection of this document/bucket to the sub collector
subCollector.collect(doc, slot); // TODO reading slot is the same as owning bucket ordinal
subCollector.collect(doc, slot);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ private AutoDateHistogramAggregator(
FilterRewriteHelper.ValueSourceContext dateHistogramSourceContext = new FilterRewriteHelper.ValueSourceContext(
valuesSourceConfig.missing() != null,
valuesSourceConfig.script() != null,
valuesSourceConfig.fieldType()
);
valuesSourceConfig.fieldType(),
0);
FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext(
parent(),
subAggregators.length,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
FilterRewriteHelper.ValueSourceContext dateHistogramSourceContext = new FilterRewriteHelper.ValueSourceContext(
valuesSourceConfig.missing() != null,
valuesSourceConfig.script() != null,
valuesSourceConfig.fieldType()
);
valuesSourceConfig.fieldType(),
0);
FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext(
parent,
subAggregators.length,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,7 @@ public void testWithDateHistogram() throws IOException {
},
(result) -> {
assertEquals(3, result.getBuckets().size());
assertEquals("{date=1508457600000}", result.afterKey().toString());
assertEquals("{date=1508457600000}", result.afterKey().toString()); // 2017-10-20T00:00:00
assertEquals("{date=1474329600000}", result.getBuckets().get(0).getKeyAsString());
assertEquals(2L, result.getBuckets().get(0).getDocCount());
assertEquals("{date=1508371200000}", result.getBuckets().get(1).getKeyAsString());
Expand All @@ -1300,9 +1300,8 @@ public void testWithDateHistogram() throws IOException {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date").field("date")
.calendarInterval(DateHistogramInterval.days(1));
return new CompositeAggregationBuilder("name", Collections.singletonList(histo)).aggregateAfter(
createAfterKey("date", 1474329600000L)
createAfterKey("date", 1474329600000L) // 2016-09-20T00:00:00
);

},
(result) -> {
assertEquals(2, result.getBuckets().size());
Expand Down

0 comments on commit 2dc0271

Please sign in to comment.