Skip to content

Commit

Permalink
Always return the after_key in composite aggregation response (#28358)
Browse files Browse the repository at this point in the history
This change adds the `after_key` of a composite aggregation directly in the response.
It is redundant when all buckets are not filtered/removed by a pipeline aggregation since in this case the `after_key` is always the last bucket
in the response. Though when using a pipeline aggregation to filter composite buckets, the `after_key` can be lost if the last bucket is filtered.
This commit fixes this situation by always returning the `after_key` in a dedicated section.
  • Loading branch information
jimczi authored Jan 25, 2018
1 parent 75116a2 commit c26d4ac
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 26 deletions.
17 changes: 15 additions & 2 deletions docs/reference/aggregations/bucket/composite-aggregation.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,10 @@ GET /_search
...
"aggregations": {
"my_buckets": {
"after_key": { <1>
"date": 1494288000000,
"product": "mad max"
},
"buckets": [
{
"key": {
Expand All @@ -403,7 +407,7 @@ GET /_search
"doc_count": 1
},
{
"key": { <1>
"key": {
"date": 1494288000000,
"product": "mad max"
},
Expand All @@ -418,9 +422,14 @@ GET /_search

<1> The last composite bucket returned by the query.

NOTE: The `after_key` is equals to the last bucket returned in the response before
any filtering that could be done by <<search-aggregations-pipeline, Pipeline aggregations>>.
If all buckets are filtered/removed by a pipeline aggregation, the `after_key` will contain
the last bucket before filtering.

The `after` parameter can be used to retrieve the composite buckets that are **after**
the last composite buckets returned in a previous round.
For the example below the last bucket is `"key": [1494288000000, "mad max"]` so the next
For the example below the last bucket can be found in `after_key` and the next
round of result can be retrieved with:

[source,js]
Expand Down Expand Up @@ -485,6 +494,10 @@ GET /_search
...
"aggregations": {
"my_buckets": {
"after_key": {
"date": 1494201600000,
"product": "rocky"
},
"buckets": [
{
"key": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ setup:
---
"Aggregate After Missing":
- skip:
version: " - 6.99.99"
reason: bug fixed in 7.0.0
version: " - 6.1.99"
reason: bug fixed in 6.2.0


- do:
Expand Down Expand Up @@ -295,3 +295,31 @@ setup:
- length: { aggregations.test.buckets: 1 }
- match: { aggregations.test.buckets.0.key.date: "2017-10-21" }
- match: { aggregations.test.buckets.0.doc_count: 1 }

---
"Composite aggregation with after_key in the response":
- skip:
version: " - 6.99.99"
reason: starting in 7.0.0 after_key is returned in the response

- do:
search:
index: test
body:
aggregations:
test:
composite:
sources: [
{
"keyword": {
"terms": {
"field": "keyword",
}
}
}
]

- match: {hits.total: 6}
- length: { aggregations.test.buckets: 2 }
- length: { aggregations.test.after_key: 1 }
- match: { aggregations.test.after_key.keyword: "foo" }
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ static XContentBuilder bucketToXContent(CompositeAggregation.Bucket bucket,
}

static XContentBuilder toXContentFragment(CompositeAggregation aggregation, XContentBuilder builder, Params params) throws IOException {
if (aggregation.afterKey() != null) {
buildCompositeMap("after_key", aggregation.afterKey(), builder);
}
builder.startArray(CommonFields.BUCKETS.getPreferredName());
for (CompositeAggregation.Bucket bucket : aggregation.getBuckets()) {
bucketToXContent(bucket, builder, params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,15 @@ public InternalAggregation buildAggregation(long zeroBucket) throws IOException
int docCount = bucketDocCount(slot);
buckets[pos++] = new InternalComposite.InternalBucket(sourceNames, formats, key, reverseMuls, docCount, aggs);
}
return new InternalComposite(name, size, sourceNames, formats, Arrays.asList(buckets), reverseMuls,
CompositeKey lastBucket = num > 0 ? buckets[num-1].getRawKey() : null;
return new InternalComposite(name, size, sourceNames, formats, Arrays.asList(buckets), lastBucket, reverseMuls,
pipelineAggregators(), metaData());
}

@Override
public InternalAggregation buildEmptyAggregation() {
final int[] reverseMuls = getReverseMuls();
return new InternalComposite(name, size, sourceNames, formats, Collections.emptyList(), reverseMuls,
return new InternalComposite(name, size, sourceNames, formats, Collections.emptyList(), null, reverseMuls,
pipelineAggregators(), metaData());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,38 @@

package org.elasticsearch.search.aggregations.bucket.composite;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;

import java.io.IOException;
import java.util.Arrays;

/**
* A key that is composed of multiple {@link Comparable} values.
*/
class CompositeKey {
class CompositeKey implements Writeable {
private final Comparable<?>[] values;

CompositeKey(Comparable<?>... values) {
this.values = values;
}

CompositeKey(StreamInput in) throws IOException {
values = new Comparable<?>[in.readVInt()];
for (int i = 0; i < values.length; i++) {
values[i] = (Comparable<?>) in.readGenericValue();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(values.length);
for (int i = 0; i < values.length; i++) {
out.writeGenericValue(values[i]);
}
}

Comparable<?>[] values() {
return values;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -50,17 +49,19 @@ public class InternalComposite

private final int size;
private final List<InternalBucket> buckets;
private final CompositeKey afterKey;
private final int[] reverseMuls;
private final List<String> sourceNames;
private final List<DocValueFormat> formats;

InternalComposite(String name, int size, List<String> sourceNames, List<DocValueFormat> formats,
List<InternalBucket> buckets, int[] reverseMuls,
List<InternalBucket> buckets, CompositeKey afterKey, int[] reverseMuls,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
this.sourceNames = sourceNames;
this.formats = formats;
this.buckets = buckets;
this.afterKey = afterKey;
this.size = size;
this.reverseMuls = reverseMuls;
}
Expand All @@ -79,6 +80,11 @@ public InternalComposite(StreamInput in) throws IOException {
}
this.reverseMuls = in.readIntArray();
this.buckets = in.readList((input) -> new InternalBucket(input, sourceNames, formats, reverseMuls));
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
this.afterKey = in.readBoolean() ? new CompositeKey(in) : null;
} else {
this.afterKey = buckets.size() > 0 ? buckets.get(buckets.size()-1).key : null;
}
}

@Override
Expand All @@ -92,6 +98,12 @@ protected void doWriteTo(StreamOutput out) throws IOException {
}
out.writeIntArray(reverseMuls);
out.writeList(buckets);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeBoolean(afterKey != null);
if (afterKey != null) {
afterKey.writeTo(out);
}
}
}

@Override
Expand All @@ -105,8 +117,14 @@ public String getWriteableName() {
}

@Override
public InternalComposite create(List<InternalBucket> buckets) {
return new InternalComposite(name, size, sourceNames, formats, buckets, reverseMuls, pipelineAggregators(), getMetaData());
public InternalComposite create(List<InternalBucket> newBuckets) {
/**
* This is used by pipeline aggregations to filter/remove buckets so we
* keep the <code>afterKey</code> of the original aggregation in order
* to be able to retrieve the next page even if all buckets have been filtered.
*/
return new InternalComposite(name, size, sourceNames, formats, newBuckets, afterKey,
reverseMuls, pipelineAggregators(), getMetaData());
}

@Override
Expand All @@ -126,7 +144,10 @@ public List<InternalBucket> getBuckets() {

@Override
public Map<String, Object> afterKey() {
return buckets.size() > 0 ? buckets.get(buckets.size()-1).getKey() : null;
if (afterKey != null) {
return new ArrayMap(sourceNames, formats, afterKey.values());
}
return null;
}

// Visible for tests
Expand Down Expand Up @@ -169,20 +190,22 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Redu
reduceContext.consumeBucketsAndMaybeBreak(1);
result.add(reduceBucket);
}
return new InternalComposite(name, size, sourceNames, formats, result, reverseMuls, pipelineAggregators(), metaData);
final CompositeKey lastKey = result.size() > 0 ? result.get(result.size()-1).getRawKey() : null;
return new InternalComposite(name, size, sourceNames, formats, result, lastKey, reverseMuls, pipelineAggregators(), metaData);
}

@Override
protected boolean doEquals(Object obj) {
InternalComposite that = (InternalComposite) obj;
return Objects.equals(size, that.size) &&
Objects.equals(buckets, that.buckets) &&
Objects.equals(afterKey, that.afterKey) &&
Arrays.equals(reverseMuls, that.reverseMuls);
}

@Override
protected int doHashCode() {
return Objects.hash(size, buckets, Arrays.hashCode(reverseMuls));
return Objects.hash(size, buckets, afterKey, Arrays.hashCode(reverseMuls));
}

private static class BucketIterator implements Comparable<BucketIterator> {
Expand Down Expand Up @@ -226,11 +249,7 @@ static class InternalBucket extends InternalMultiBucketAggregation.InternalBucke

@SuppressWarnings("unchecked")
InternalBucket(StreamInput in, List<String> sourceNames, List<DocValueFormat> formats, int[] reverseMuls) throws IOException {
final Comparable<?>[] values = new Comparable<?>[in.readVInt()];
for (int i = 0; i < values.length; i++) {
values[i] = (Comparable<?>) in.readGenericValue();
}
this.key = new CompositeKey(values);
this.key = new CompositeKey(in);
this.docCount = in.readVLong();
this.aggregations = InternalAggregations.readAggregations(in);
this.reverseMuls = reverseMuls;
Expand All @@ -240,10 +259,7 @@ static class InternalBucket extends InternalMultiBucketAggregation.InternalBucke

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(key.size());
for (int i = 0; i < key.size(); i++) {
out.writeGenericValue(key.get(i));
}
key.writeTo(out);
out.writeVLong(docCount);
aggregations.writeTo(out);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.search.aggregations.bucket.composite;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -33,15 +34,26 @@ public class ParsedComposite extends ParsedMultiBucketAggregation<ParsedComposit
new ObjectParser<>(ParsedComposite.class.getSimpleName(), true, ParsedComposite::new);

static {
PARSER.declareField(ParsedComposite::setAfterKey, (p, c) -> p.mapOrdered(), new ParseField("after_key"),
ObjectParser.ValueType.OBJECT);
declareMultiBucketAggregationFields(PARSER,
parser -> ParsedComposite.ParsedBucket.fromXContent(parser),
parser -> null
);
}

private Map<String, Object> afterKey;

public static ParsedComposite fromXContent(XContentParser parser, String name) throws IOException {
ParsedComposite aggregation = PARSER.parse(parser, null);
aggregation.setName(name);
if (aggregation.afterKey == null && aggregation.getBuckets().size() > 0) {
/**
* Previous versions (< 6.3) don't send <code>afterKey</code>
* in the response so we set it as the last returned buckets.
*/
aggregation.setAfterKey(aggregation.getBuckets().get(aggregation.getBuckets().size()-1).key);
}
return aggregation;
}

Expand All @@ -57,9 +69,16 @@ public List<ParsedBucket> getBuckets() {

@Override
public Map<String, Object> afterKey() {
if (afterKey != null) {
return afterKey;
}
return buckets.size() > 0 ? buckets.get(buckets.size()-1).getKey() : null;
}

private void setAfterKey(Map<String, Object> afterKey) {
this.afterKey = afterKey;
}

@Override
protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
return CompositeAggregation.toXContentFragment(this, builder, params);
Expand Down
Loading

0 comments on commit c26d4ac

Please sign in to comment.