Skip to content

Commit

Permalink
Change internal representation of bucket key of time_series agg
Browse files Browse the repository at this point in the history
Currently, the key is a map, which can make reducing large response
more memory intense then it should be also. Also data structures
used during reduce are not back by bigarrays so not accounted for.

This commit changes how the key is represented internally.
By using BytesRef instead of Map. This doesn't commit
doesn't change how the key is represented in the response.
It also changes the reduce method to make use of the bucket
keys are now bytes refs.

Relates to elastic#74660
  • Loading branch information
martijnvg committed Nov 8, 2022
1 parent a8a684e commit f8dfc46
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@

package org.elasticsearch.search.aggregations.timeseries;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BytesRefHash;
import org.elasticsearch.common.util.LongObjectPagedHashMap;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
Expand Down Expand Up @@ -46,11 +51,12 @@ public class InternalTimeSeries extends InternalMultiBucketAggregation<InternalT
public static class InternalBucket extends InternalMultiBucketAggregation.InternalBucket implements TimeSeries.Bucket {
protected long bucketOrd;
protected final boolean keyed;
protected final Map<String, Object> key;
protected final BytesRef key;
// TODO: make computing docCount optional
protected long docCount;
protected InternalAggregations aggregations;

public InternalBucket(Map<String, Object> key, long docCount, InternalAggregations aggregations, boolean keyed) {
public InternalBucket(BytesRef key, long docCount, InternalAggregations aggregations, boolean keyed) {
this.key = key;
this.docCount = docCount;
this.aggregations = aggregations;
Expand All @@ -62,26 +68,26 @@ public InternalBucket(Map<String, Object> key, long docCount, InternalAggregatio
*/
public InternalBucket(StreamInput in, boolean keyed) throws IOException {
this.keyed = keyed;
key = in.readOrderedMap(StreamInput::readString, StreamInput::readGenericValue);
key = in.readBytesRef();
docCount = in.readVLong();
aggregations = InternalAggregations.readFrom(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(key, StreamOutput::writeString, StreamOutput::writeGenericValue);
out.writeBytesRef(key);
out.writeVLong(docCount);
aggregations.writeTo(out);
}

@Override
public Map<String, Object> getKey() {
return key;
return TimeSeriesIdFieldMapper.decodeTsid(key);
}

@Override
public String getKeyAsString() {
return key.toString();
return getKey().toString();
}

@Override
Expand All @@ -96,8 +102,10 @@ public InternalAggregations getAggregations() {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// Use map key in the xcontent response:
var key = getKey();
if (keyed) {
builder.startObject(getKeyAsString());
builder.startObject(key.toString());
} else {
builder.startObject();
}
Expand Down Expand Up @@ -186,38 +194,46 @@ protected void doWriteTo(StreamOutput out) throws IOException {

@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
// We still need to reduce in case we got the same time series in 2 different indices, but we should be able to optimize
// that in the future
Map<Map<String, Object>, List<InternalBucket>> bucketsList = null;
for (InternalAggregation aggregation : aggregations) {
InternalTimeSeries timeSeries = (InternalTimeSeries) aggregation;
if (bucketsList != null) {
for (InternalBucket bucket : timeSeries.buckets) {
bucketsList.compute(bucket.key, (map, list) -> {
if (list == null) {
list = new ArrayList<>();
// TODO: optimize single result case either by having a if check here and return aggregations.get(0) or
// by overwriting the mustReduceOnSingleInternalAgg() method

final BigArrays bigArrays = reduceContext.bigArrays();
final int initialCapacity = aggregations.stream()
.map(value -> (InternalTimeSeries) value)
.mapToInt(value -> value.getBuckets().size())
.max()
.getAsInt();
try (LongObjectPagedHashMap<List<InternalBucket>> tsKeyToBuckets = new LongObjectPagedHashMap<>(initialCapacity, bigArrays)) {
final int numTsids;
// We still need to reduce in case we got the same time series in 2 different indices, but we should be able to optimize
// that in the future
try (BytesRefHash tsids = new BytesRefHash(initialCapacity, bigArrays)) {
for (int i = 0; i < aggregations.size(); i++) {
InternalTimeSeries timeSeries = (InternalTimeSeries) aggregations.get(i);
for (int j = 0; j < timeSeries.getBuckets().size(); j++) {
InternalBucket bucket = timeSeries.getBuckets().get(j);
long key = tsids.add(bucket.key);
List<InternalBucket> buckets;
if (key < 0) {
key = -1 - key;
buckets = tsKeyToBuckets.get(key);
} else {
buckets = new ArrayList<>();
}
list.add(bucket);
return list;
});
}
} else {
bucketsList = new HashMap<>(timeSeries.buckets.size());
for (InternalTimeSeries.InternalBucket bucket : timeSeries.buckets) {
List<InternalBucket> bucketList = new ArrayList<>();
bucketList.add(bucket);
bucketsList.put(bucket.key, bucketList);
buckets.add(bucket);
tsKeyToBuckets.put(key, buckets);
}
}
numTsids = (int) tsids.size();
}
}

reduceContext.consumeBucketsAndMaybeBreak(bucketsList.size());
InternalTimeSeries reduced = new InternalTimeSeries(name, new ArrayList<>(bucketsList.size()), keyed, getMetadata());
for (Map.Entry<Map<String, Object>, List<InternalBucket>> bucketEntry : bucketsList.entrySet()) {
reduced.buckets.add(reduceBucket(bucketEntry.getValue(), reduceContext));
reduceContext.consumeBucketsAndMaybeBreak(numTsids);
InternalTimeSeries reduced = new InternalTimeSeries(name, new ArrayList<>(numTsids), keyed, getMetadata());
for (LongObjectPagedHashMap.Cursor<List<InternalBucket>> tsKeyToBucket : tsKeyToBuckets) {
reduced.buckets.add(reduceBucket(tsKeyToBucket.value, reduceContext));
}
return reduced;
}
return reduced;

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -51,14 +50,14 @@ public TimeSeriesAggregator(
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
InternalTimeSeries.InternalBucket[][] allBucketsPerOrd = new InternalTimeSeries.InternalBucket[owningBucketOrds.length][];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
BytesRef spareKey = new BytesRef();
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
List<InternalTimeSeries.InternalBucket> buckets = new ArrayList<>();
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
BytesRef spareKey = new BytesRef();
ordsEnum.readValue(spareKey);
InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket(
TimeSeriesIdFieldMapper.decodeTsid(spareKey),
spareKey,
docCount,
null,
keyed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@

package org.elasticsearch.search.aggregations.timeseries;

import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -28,7 +31,16 @@ private List<InternalTimeSeries.InternalBucket> randomBuckets(boolean keyed, Int
List<Map<String, Object>> keys = randomKeys(bucketKeys(randomIntBetween(1, 4)), numberOfBuckets);
for (int j = 0; j < numberOfBuckets; j++) {
long docCount = randomLongBetween(0, Long.MAX_VALUE / (20L * numberOfBuckets));
bucketList.add(new InternalTimeSeries.InternalBucket(keys.get(j), docCount, aggregations, keyed));
var builder = new TimeSeriesIdFieldMapper.TimeSeriesIdBuilder(null);
for (var entry : keys.get(j).entrySet()) {
builder.addString(entry.getKey(), (String) entry.getValue());
}
try {
var key = builder.build().toBytesRef();
bucketList.add(new InternalTimeSeries.InternalBucket(key, docCount, aggregations, keyed));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
return bucketList;
}
Expand Down

0 comments on commit f8dfc46

Please sign in to comment.