Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change internal representation of bucket key of time_series agg #91407

Merged
merged 8 commits into from
Nov 10, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@

package org.elasticsearch.aggregations.bucket.timeseries;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.XContentBuilder;

Expand Down Expand Up @@ -47,11 +51,12 @@ public class InternalTimeSeries extends InternalMultiBucketAggregation<InternalT
public static class InternalBucket extends InternalMultiBucketAggregation.InternalBucket {
protected long bucketOrd;
protected final boolean keyed;
protected final Map<String, Object> key;
protected final BytesRef key;
// TODO: make computing docCount optional
protected long docCount;
protected InternalAggregations aggregations;

public InternalBucket(Map<String, Object> key, long docCount, InternalAggregations aggregations, boolean keyed) {
public InternalBucket(BytesRef key, long docCount, InternalAggregations aggregations, boolean keyed) {
this.key = key;
this.docCount = docCount;
this.aggregations = aggregations;
Expand All @@ -63,26 +68,26 @@ public InternalBucket(Map<String, Object> key, long docCount, InternalAggregatio
*/
public InternalBucket(StreamInput in, boolean keyed) throws IOException {
this.keyed = keyed;
key = in.readOrderedMap(StreamInput::readString, StreamInput::readGenericValue);
key = in.readBytesRef();
docCount = in.readVLong();
aggregations = InternalAggregations.readFrom(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(key, StreamOutput::writeString, StreamOutput::writeGenericValue);
out.writeBytesRef(key);
out.writeVLong(docCount);
aggregations.writeTo(out);
}

@Override
public Map<String, Object> getKey() {
return key;
return TimeSeriesIdFieldMapper.decodeTsid(key);
}

@Override
public String getKeyAsString() {
return key.toString();
return getKey().toString();
}

@Override
Expand All @@ -97,8 +102,10 @@ public InternalAggregations getAggregations() {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// Use map key in the xcontent response:
var key = getKey();
if (keyed) {
builder.startObject(getKeyAsString());
builder.startObject(key.toString());
} else {
builder.startObject();
}
Expand Down Expand Up @@ -187,38 +194,61 @@ protected void doWriteTo(StreamOutput out) throws IOException {

@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
// We still need to reduce in case we got the same time series in 2 different indices, but we should be able to optimize
// that in the future
Map<Map<String, Object>, List<InternalBucket>> bucketsList = null;
// TODO: optimize single result case either by having a if check here and return aggregations.get(0) or
// by overwriting the mustReduceOnSingleInternalAgg() method
final int initialCapacity = aggregations.stream()
.map(value -> (InternalTimeSeries) value)
.mapToInt(value -> value.getBuckets().size())
.max()
.getAsInt();

final PriorityQueue<IteratorAndCurrent<InternalBucket>> pq = new PriorityQueue<>(aggregations.size()) {
@Override
protected boolean lessThan(IteratorAndCurrent<InternalBucket> a, IteratorAndCurrent<InternalBucket> b) {
return a.current().key.compareTo(b.current().key) < 0;
}
};
for (InternalAggregation aggregation : aggregations) {
InternalTimeSeries timeSeries = (InternalTimeSeries) aggregation;
if (bucketsList != null) {
for (InternalBucket bucket : timeSeries.buckets) {
bucketsList.compute(bucket.key, (map, list) -> {
if (list == null) {
list = new ArrayList<>();
}
list.add(bucket);
return list;
});
}
} else {
bucketsList = new HashMap<>(timeSeries.buckets.size());
for (InternalTimeSeries.InternalBucket bucket : timeSeries.buckets) {
List<InternalBucket> bucketList = new ArrayList<>();
bucketList.add(bucket);
bucketsList.put(bucket.key, bucketList);
}
if (timeSeries.buckets.isEmpty() == false) {
IteratorAndCurrent<InternalBucket> iterator = new IteratorAndCurrent<>(timeSeries.buckets.iterator());
pq.add(iterator);
}
}

reduceContext.consumeBucketsAndMaybeBreak(bucketsList.size());
InternalTimeSeries reduced = new InternalTimeSeries(name, new ArrayList<>(bucketsList.size()), keyed, getMetadata());
for (Map.Entry<Map<String, Object>, List<InternalBucket>> bucketEntry : bucketsList.entrySet()) {
reduced.buckets.add(reduceBucket(bucketEntry.getValue(), reduceContext));
InternalTimeSeries reduced = new InternalTimeSeries(name, new ArrayList<>(initialCapacity), keyed, getMetadata());
List<InternalBucket> bucketsWithSameKey = new ArrayList<>(aggregations.size());
BytesRef prevTsid = null;
while (pq.size() > 0) {
reduceContext.consumeBucketsAndMaybeBreak(1);
bucketsWithSameKey.clear();

while (bucketsWithSameKey.isEmpty() || bucketsWithSameKey.get(0).key.equals(pq.top().current().key)) {
IteratorAndCurrent<InternalBucket> iterator = pq.top();
bucketsWithSameKey.add(iterator.current());
if (iterator.hasNext()) {
iterator.next();
pq.updateTop();
} else {
pq.pop();
if (pq.size() == 0) {
break;
}
}
}

InternalBucket reducedBucket;
if (bucketsWithSameKey.size() == 1) {
reducedBucket = bucketsWithSameKey.get(0);
} else {
reducedBucket = reduceBucket(bucketsWithSameKey, reduceContext);
}
BytesRef tsid = reducedBucket.key;
assert prevTsid == null || tsid.compareTo(prevTsid) > 0;
reduced.buckets.add(reducedBucket);
prevTsid = tsid;
}
return reduced;

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
Expand All @@ -32,7 +31,6 @@ public class TimeSeriesAggregator extends BucketsAggregator {
protected final BytesKeyedBucketOrds bucketOrds;
private final boolean keyed;

@SuppressWarnings("unchecked")
public TimeSeriesAggregator(
String name,
AggregatorFactories factories,
Expand All @@ -49,16 +47,19 @@ public TimeSeriesAggregator(

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
BytesRef spare = new BytesRef();
InternalTimeSeries.InternalBucket[][] allBucketsPerOrd = new InternalTimeSeries.InternalBucket[owningBucketOrds.length][];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
BytesRef spareKey = new BytesRef();
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
List<InternalTimeSeries.InternalBucket> buckets = new ArrayList<>();
BytesRef prev = null;
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
ordsEnum.readValue(spareKey);
ordsEnum.readValue(spare);
assert prev == null || spare.compareTo(prev) > 0
: "key [" + spare.utf8ToString() + "] is smaller than previous key [" + prev.utf8ToString() + "]";
InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket(
TimeSeriesIdFieldMapper.decodeTsid(spareKey),
prev = BytesRef.deepCopyOf(spare), // Closing bucketOrds will corrupt the bytes ref, so need to make a deep copy here.
docCount,
null,
keyed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,32 @@

package org.elasticsearch.aggregations.bucket.timeseries;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.aggregations.bucket.AggregationMultiBucketAggregationTestCase;
import org.elasticsearch.aggregations.bucket.timeseries.InternalTimeSeries.InternalBucket;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.xcontent.ContextParser;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Predicate;

import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.equalTo;

public class InternalTimeSeriesTests extends AggregationMultiBucketAggregationTestCase<InternalTimeSeries> {

Expand All @@ -29,14 +42,25 @@ protected Map.Entry<String, ContextParser<Object, Aggregation>> getParser() {
return Map.entry(TimeSeriesAggregationBuilder.NAME, (p, c) -> ParsedTimeSeries.fromXContent(p, (String) c));
}

private List<InternalTimeSeries.InternalBucket> randomBuckets(boolean keyed, InternalAggregations aggregations) {
private List<InternalBucket> randomBuckets(boolean keyed, InternalAggregations aggregations) {
int numberOfBuckets = randomNumberOfBuckets();
List<InternalTimeSeries.InternalBucket> bucketList = new ArrayList<>(numberOfBuckets);
List<InternalBucket> bucketList = new ArrayList<>(numberOfBuckets);
List<Map<String, Object>> keys = randomKeys(bucketKeys(randomIntBetween(1, 4)), numberOfBuckets);
for (int j = 0; j < numberOfBuckets; j++) {
long docCount = randomLongBetween(0, Long.MAX_VALUE / (20L * numberOfBuckets));
bucketList.add(new InternalTimeSeries.InternalBucket(keys.get(j), docCount, aggregations, keyed));
var builder = new TimeSeriesIdFieldMapper.TimeSeriesIdBuilder(null);
for (var entry : keys.get(j).entrySet()) {
builder.addString(entry.getKey(), (String) entry.getValue());
}
try {
var key = builder.build().toBytesRef();
bucketList.add(new InternalBucket(key, docCount, aggregations, keyed));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
// The interal time series' reduce method expects for each shard level response that the buckets are sorted by tsid:
bucketList.sort(Comparator.comparing(o -> o.key));
return bucketList;
}

Expand Down Expand Up @@ -68,7 +92,7 @@ protected InternalTimeSeries createTestInstance(String name, Map<String, Object>
protected void assertReduced(InternalTimeSeries reduced, List<InternalTimeSeries> inputs) {
Map<Map<String, Object>, Long> keys = new HashMap<>();
for (InternalTimeSeries in : inputs) {
for (InternalTimeSeries.InternalBucket bucket : in.getBuckets()) {
for (InternalBucket bucket : in.getBuckets()) {
keys.compute(bucket.getKey(), (k, v) -> {
if (v == null) {
return bucket.docCount;
Expand All @@ -79,7 +103,7 @@ protected void assertReduced(InternalTimeSeries reduced, List<InternalTimeSeries
}
}
assertThat(
reduced.getBuckets().stream().map(InternalTimeSeries.InternalBucket::getKey).toArray(Object[]::new),
reduced.getBuckets().stream().map(InternalBucket::getKey).toArray(Object[]::new),
arrayContainingInAnyOrder(keys.keySet().toArray(Object[]::new))
);
}
Expand All @@ -93,4 +117,58 @@ protected Class<ParsedTimeSeries> implementationClass() {
protected Predicate<String> excludePathsFromXContentInsertion() {
return s -> s.endsWith(".key");
}

public void testReduceSimple() {
// a simple test, to easily spot easy mistakes in the merge logic in InternalTimeSeries#reduce(...) method.
InternalTimeSeries first = new InternalTimeSeries(
"ts",
List.of(
new InternalBucket(new BytesRef("1"), 3, InternalAggregations.EMPTY, false),
new InternalBucket(new BytesRef("10"), 6, InternalAggregations.EMPTY, false),
new InternalBucket(new BytesRef("2"), 2, InternalAggregations.EMPTY, false),
new InternalBucket(new BytesRef("9"), 5, InternalAggregations.EMPTY, false)
),
false,
Map.of()
);
InternalTimeSeries second = new InternalTimeSeries(
"ts",
List.of(
new InternalBucket(new BytesRef("2"), 1, InternalAggregations.EMPTY, false),
new InternalBucket(new BytesRef("3"), 3, InternalAggregations.EMPTY, false)
),
false,
Map.of()
);
InternalTimeSeries third = new InternalTimeSeries(
"ts",
List.of(
new InternalBucket(new BytesRef("1"), 2, InternalAggregations.EMPTY, false),
new InternalBucket(new BytesRef("3"), 4, InternalAggregations.EMPTY, false),
new InternalBucket(new BytesRef("9"), 4, InternalAggregations.EMPTY, false)
),
false,
Map.of()
);
AggregationReduceContext context = new AggregationReduceContext.ForFinal(
new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()),
mockScriptService(),
() -> false,
new TimeSeriesAggregationBuilder("ts"),
value -> {},
PipelineAggregator.PipelineTree.EMPTY
);

InternalTimeSeries result = (InternalTimeSeries) first.reduce(List.of(first, second, third), context);
assertThat(result.getBuckets().get(0).key.utf8ToString(), equalTo("1"));
assertThat(result.getBuckets().get(0).getDocCount(), equalTo(5L));
assertThat(result.getBuckets().get(1).key.utf8ToString(), equalTo("10"));
assertThat(result.getBuckets().get(1).getDocCount(), equalTo(6L));
assertThat(result.getBuckets().get(2).key.utf8ToString(), equalTo("2"));
assertThat(result.getBuckets().get(2).getDocCount(), equalTo(3L));
assertThat(result.getBuckets().get(3).key.utf8ToString(), equalTo("3"));
assertThat(result.getBuckets().get(3).getDocCount(), equalTo(7L));
assertThat(result.getBuckets().get(4).key.utf8ToString(), equalTo("9"));
assertThat(result.getBuckets().get(4).getDocCount(), equalTo(9L));
}
}