Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix composite aggregation failed test cases introduce by missing_order parameter (#1942) #2005

Merged
merged 2 commits into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.opensearch.search.aggregations.MultiBucketCollector;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.searchafter.SearchAfterBuilder;
import org.opensearch.search.sort.SortAndFormats;
Expand All @@ -89,6 +90,7 @@ final class CompositeAggregator extends BucketsAggregator {
private final int size;
private final List<String> sourceNames;
private final int[] reverseMuls;
private final MissingOrder[] missingOrders;
private final List<DocValueFormat> formats;
private final CompositeKey rawAfterKey;

Expand Down Expand Up @@ -117,6 +119,7 @@ final class CompositeAggregator extends BucketsAggregator {
this.size = size;
this.sourceNames = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::name).collect(Collectors.toList());
this.reverseMuls = Arrays.stream(sourceConfigs).mapToInt(CompositeValuesSourceConfig::reverseMul).toArray();
this.missingOrders = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::missingOrder).toArray(MissingOrder[]::new);
this.formats = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::format).collect(Collectors.toList());
this.sources = new SingleDimensionValuesSource[sourceConfigs.length];
// check that the provided size is not greater than the search.max_buckets setting
Expand Down Expand Up @@ -189,7 +192,15 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
CompositeKey key = queue.toCompositeKey(slot);
InternalAggregations aggs = subAggsForBuckets[slot];
int docCount = queue.getDocCount(slot);
buckets[queue.size()] = new InternalComposite.InternalBucket(sourceNames, formats, key, reverseMuls, docCount, aggs);
buckets[queue.size()] = new InternalComposite.InternalBucket(
sourceNames,
formats,
key,
reverseMuls,
missingOrders,
docCount,
aggs
);
}
CompositeKey lastBucket = num > 0 ? buckets[num - 1].getRawKey() : null;
return new InternalAggregation[] {
Expand All @@ -201,14 +212,26 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
Arrays.asList(buckets),
lastBucket,
reverseMuls,
missingOrders,
earlyTerminated,
metadata()
) };
}

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalComposite(name, size, sourceNames, formats, Collections.emptyList(), null, reverseMuls, false, metadata());
return new InternalComposite(
name,
size,
sourceNames,
formats,
Collections.emptyList(),
null,
reverseMuls,
missingOrders,
false,
metadata()
);
}

private void finishLeaf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.lucene.util.BytesRef;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.XContentBuilder;
Expand All @@ -43,6 +44,7 @@
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.aggregations.InternalMultiBucketAggregation;
import org.opensearch.search.aggregations.KeyComparable;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;

import java.io.IOException;
import java.util.AbstractMap;
Expand All @@ -64,6 +66,7 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
private final List<InternalBucket> buckets;
private final CompositeKey afterKey;
private final int[] reverseMuls;
private final MissingOrder[] missingOrders;
private final List<String> sourceNames;
private final List<DocValueFormat> formats;

Expand All @@ -77,6 +80,7 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
List<InternalBucket> buckets,
CompositeKey afterKey,
int[] reverseMuls,
MissingOrder[] missingOrders,
boolean earlyTerminated,
Map<String, Object> metadata
) {
Expand All @@ -87,6 +91,7 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
this.afterKey = afterKey;
this.size = size;
this.reverseMuls = reverseMuls;
this.missingOrders = missingOrders;
this.earlyTerminated = earlyTerminated;
}

Expand All @@ -99,7 +104,13 @@ public InternalComposite(StreamInput in) throws IOException {
formats.add(in.readNamedWriteable(DocValueFormat.class));
}
this.reverseMuls = in.readIntArray();
this.buckets = in.readList((input) -> new InternalBucket(input, sourceNames, formats, reverseMuls));
if (in.getVersion().onOrAfter(Version.V_2_0_0)) {
this.missingOrders = in.readArray(MissingOrder::readFromStream, MissingOrder[]::new);
} else {
this.missingOrders = new MissingOrder[reverseMuls.length];
Arrays.fill(this.missingOrders, MissingOrder.DEFAULT);
}
this.buckets = in.readList((input) -> new InternalBucket(input, sourceNames, formats, reverseMuls, missingOrders));
this.afterKey = in.readBoolean() ? new CompositeKey(in) : null;
this.earlyTerminated = in.getVersion().onOrAfter(LegacyESVersion.V_7_6_0) ? in.readBoolean() : false;
}
Expand All @@ -112,6 +123,9 @@ protected void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
}
out.writeIntArray(reverseMuls);
if (out.getVersion().onOrAfter(Version.V_2_0_0)) {
out.writeArray((output, order) -> order.writeTo(output), missingOrders);
}
out.writeList(buckets);
out.writeBoolean(afterKey != null);
if (afterKey != null) {
Expand Down Expand Up @@ -140,7 +154,18 @@ public InternalComposite create(List<InternalBucket> newBuckets) {
* keep the <code>afterKey</code> of the original aggregation in order
* to be able to retrieve the next page even if all buckets have been filtered.
*/
return new InternalComposite(name, size, sourceNames, formats, newBuckets, afterKey, reverseMuls, earlyTerminated, getMetadata());
return new InternalComposite(
name,
size,
sourceNames,
formats,
newBuckets,
afterKey,
reverseMuls,
missingOrders,
earlyTerminated,
getMetadata()
);
}

@Override
Expand All @@ -150,6 +175,7 @@ public InternalBucket createBucket(InternalAggregations aggregations, InternalBu
prototype.formats,
prototype.key,
prototype.reverseMuls,
prototype.missingOrders,
prototype.docCount,
aggregations
);
Expand Down Expand Up @@ -235,7 +261,18 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
lastKey = lastBucket.getRawKey();
}
reduceContext.consumeBucketsAndMaybeBreak(result.size());
return new InternalComposite(name, size, sourceNames, reducedFormats, result, lastKey, reverseMuls, earlyTerminated, metadata);
return new InternalComposite(
name,
size,
sourceNames,
reducedFormats,
result,
lastKey,
reverseMuls,
missingOrders,
earlyTerminated,
metadata
);
}

@Override
Expand All @@ -253,7 +290,7 @@ protected InternalBucket reduceBucket(List<InternalBucket> buckets, ReduceContex
* just whatever formats make sense for *its* index. This can be real
* trouble when the index doing the reducing is unmapped. */
List<DocValueFormat> reducedFormats = buckets.get(0).formats;
return new InternalBucket(sourceNames, reducedFormats, buckets.get(0).key, reverseMuls, docCount, aggs);
return new InternalBucket(sourceNames, reducedFormats, buckets.get(0).key, reverseMuls, missingOrders, docCount, aggs);
}

@Override
Expand All @@ -266,12 +303,13 @@ public boolean equals(Object obj) {
return Objects.equals(size, that.size)
&& Objects.equals(buckets, that.buckets)
&& Objects.equals(afterKey, that.afterKey)
&& Arrays.equals(reverseMuls, that.reverseMuls);
&& Arrays.equals(reverseMuls, that.reverseMuls)
&& Arrays.equals(missingOrders, that.missingOrders);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), size, buckets, afterKey, Arrays.hashCode(reverseMuls));
return Objects.hash(super.hashCode(), size, buckets, afterKey, Arrays.hashCode(reverseMuls), Arrays.hashCode(missingOrders));
}

private static class BucketIterator implements Comparable<BucketIterator> {
Expand Down Expand Up @@ -301,6 +339,7 @@ public static class InternalBucket extends InternalMultiBucketAggregation.Intern
private final long docCount;
private final InternalAggregations aggregations;
private final transient int[] reverseMuls;
private final transient MissingOrder[] missingOrders;
private final transient List<String> sourceNames;
private final transient List<DocValueFormat> formats;

Expand All @@ -309,22 +348,31 @@ public static class InternalBucket extends InternalMultiBucketAggregation.Intern
List<DocValueFormat> formats,
CompositeKey key,
int[] reverseMuls,
MissingOrder[] missingOrders,
long docCount,
InternalAggregations aggregations
) {
this.key = key;
this.docCount = docCount;
this.aggregations = aggregations;
this.reverseMuls = reverseMuls;
this.missingOrders = missingOrders;
this.sourceNames = sourceNames;
this.formats = formats;
}

InternalBucket(StreamInput in, List<String> sourceNames, List<DocValueFormat> formats, int[] reverseMuls) throws IOException {
InternalBucket(
StreamInput in,
List<String> sourceNames,
List<DocValueFormat> formats,
int[] reverseMuls,
MissingOrder[] missingOrders
) throws IOException {
this.key = new CompositeKey(in);
this.docCount = in.readVLong();
this.aggregations = InternalAggregations.readFrom(in);
this.reverseMuls = reverseMuls;
this.missingOrders = missingOrders;
this.sourceNames = sourceNames;
this.formats = formats;
}
Expand Down Expand Up @@ -400,13 +448,15 @@ List<DocValueFormat> getFormats() {
@Override
public int compareKey(InternalBucket other) {
for (int i = 0; i < key.size(); i++) {
if (key.get(i) == null) {
if (other.key.get(i) == null) {
// lambda function require final variable.
final int index = i;
int result = missingOrders[i].compare(() -> key.get(index) == null, () -> other.key.get(index) == null, reverseMuls[i]);
if (MissingOrder.unknownOrder(result) == false) {
if (result == 0) {
continue;
} else {
return result;
}
return -1 * reverseMuls[i];
} else if (other.key.get(i) == null) {
return reverseMuls[i];
}
assert key.get(i).getClass() == other.key.get(i).getClass();
@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.aggregations.ParsedAggregation;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.test.InternalMultiBucketAggregationTestCase;
import org.junit.After;

Expand Down Expand Up @@ -71,6 +72,7 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa
private List<String> sourceNames;
private List<DocValueFormat> formats;
private int[] reverseMuls;
private MissingOrder[] missingOrders;
private int[] types;
private int size;

Expand Down Expand Up @@ -100,10 +102,12 @@ public void setUp() throws Exception {
sourceNames = new ArrayList<>();
formats = new ArrayList<>();
reverseMuls = new int[numFields];
missingOrders = new MissingOrder[numFields];
types = new int[numFields];
for (int i = 0; i < numFields; i++) {
sourceNames.add("field_" + i);
reverseMuls[i] = randomBoolean() ? 1 : -1;
missingOrders[i] = randomFrom(MissingOrder.values());
int type = randomIntBetween(0, 2);
types[i] = type;
formats.add(randomDocValueFormat(type == 0));
Expand Down Expand Up @@ -182,14 +186,26 @@ protected InternalComposite createTestInstance(String name, Map<String, Object>
formats,
key,
reverseMuls,
missingOrders,
1L,
aggregations
);
buckets.add(bucket);
}
Collections.sort(buckets, (o1, o2) -> o1.compareKey(o2));
CompositeKey lastBucket = buckets.size() > 0 ? buckets.get(buckets.size() - 1).getRawKey() : null;
return new InternalComposite(name, size, sourceNames, formats, buckets, lastBucket, reverseMuls, randomBoolean(), metadata);
return new InternalComposite(
name,
size,
sourceNames,
formats,
buckets,
lastBucket,
reverseMuls,
missingOrders,
randomBoolean(),
metadata
);
}

@Override
Expand All @@ -214,6 +230,7 @@ protected InternalComposite mutateInstance(InternalComposite instance) throws IO
formats,
createCompositeKey(),
reverseMuls,
missingOrders,
randomLongBetween(1, 100),
InternalAggregations.EMPTY
)
Expand All @@ -239,6 +256,7 @@ protected InternalComposite mutateInstance(InternalComposite instance) throws IO
buckets,
lastBucket,
reverseMuls,
missingOrders,
randomBoolean(),
metadata
);
Expand Down Expand Up @@ -295,6 +313,7 @@ public void testReduceUnmapped() throws IOException {
emptyList(),
null,
reverseMuls,
missingOrders,
true,
emptyMap()
);
Expand Down