Skip to content

Commit

Permalink
Change internal representation of bucket key of time_series agg (#91407)
Browse files Browse the repository at this point in the history
Currently, the key is a map, which can make reducing large responses
more memory intense then it should be also. Also the map used during the
reduce to detect duplicate buckets is not taken into account by circuit breaker.
This map can become very large when reducing large shard level responses.

This commit changes how the key is represented internally.
By using BytesRef instead of Map. This commit doesn't change 
how the key is represented in the response. The reduce is also 
changed to merge the shard responses without creating intermediate 
data structures for detected duplicated buckets. This is possible
because the buckets in the shard level responses are sorted by tsid.

Relates to #74660
  • Loading branch information
martijnvg authored Nov 10, 2022
1 parent 9d0b0ba commit 3ece828
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@

package org.elasticsearch.aggregations.bucket.timeseries;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.XContentBuilder;

Expand Down Expand Up @@ -47,11 +51,12 @@ public class InternalTimeSeries extends InternalMultiBucketAggregation<InternalT
public static class InternalBucket extends InternalMultiBucketAggregation.InternalBucket {
protected long bucketOrd;
protected final boolean keyed;
protected final Map<String, Object> key;
protected final BytesRef key;
// TODO: make computing docCount optional
protected long docCount;
protected InternalAggregations aggregations;

public InternalBucket(Map<String, Object> key, long docCount, InternalAggregations aggregations, boolean keyed) {
public InternalBucket(BytesRef key, long docCount, InternalAggregations aggregations, boolean keyed) {
this.key = key;
this.docCount = docCount;
this.aggregations = aggregations;
Expand All @@ -63,26 +68,26 @@ public InternalBucket(Map<String, Object> key, long docCount, InternalAggregatio
*/
public InternalBucket(StreamInput in, boolean keyed) throws IOException {
this.keyed = keyed;
key = in.readOrderedMap(StreamInput::readString, StreamInput::readGenericValue);
key = in.readBytesRef();
docCount = in.readVLong();
aggregations = InternalAggregations.readFrom(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(key, StreamOutput::writeString, StreamOutput::writeGenericValue);
out.writeBytesRef(key);
out.writeVLong(docCount);
aggregations.writeTo(out);
}

@Override
public Map<String, Object> getKey() {
return key;
return TimeSeriesIdFieldMapper.decodeTsid(key);
}

@Override
public String getKeyAsString() {
return key.toString();
return getKey().toString();
}

@Override
Expand All @@ -97,8 +102,10 @@ public InternalAggregations getAggregations() {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// Use map key in the xcontent response:
var key = getKey();
if (keyed) {
builder.startObject(getKeyAsString());
builder.startObject(key.toString());
} else {
builder.startObject();
}
Expand Down Expand Up @@ -187,38 +194,61 @@ protected void doWriteTo(StreamOutput out) throws IOException {

@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
// We still need to reduce in case we got the same time series in 2 different indices, but we should be able to optimize
// that in the future
Map<Map<String, Object>, List<InternalBucket>> bucketsList = null;
// TODO: optimize single result case either by having a if check here and return aggregations.get(0) or
// by overwriting the mustReduceOnSingleInternalAgg() method
final int initialCapacity = aggregations.stream()
.map(value -> (InternalTimeSeries) value)
.mapToInt(value -> value.getBuckets().size())
.max()
.getAsInt();

final PriorityQueue<IteratorAndCurrent<InternalBucket>> pq = new PriorityQueue<>(aggregations.size()) {
@Override
protected boolean lessThan(IteratorAndCurrent<InternalBucket> a, IteratorAndCurrent<InternalBucket> b) {
return a.current().key.compareTo(b.current().key) < 0;
}
};
for (InternalAggregation aggregation : aggregations) {
InternalTimeSeries timeSeries = (InternalTimeSeries) aggregation;
if (bucketsList != null) {
for (InternalBucket bucket : timeSeries.buckets) {
bucketsList.compute(bucket.key, (map, list) -> {
if (list == null) {
list = new ArrayList<>();
}
list.add(bucket);
return list;
});
}
} else {
bucketsList = new HashMap<>(timeSeries.buckets.size());
for (InternalTimeSeries.InternalBucket bucket : timeSeries.buckets) {
List<InternalBucket> bucketList = new ArrayList<>();
bucketList.add(bucket);
bucketsList.put(bucket.key, bucketList);
}
if (timeSeries.buckets.isEmpty() == false) {
IteratorAndCurrent<InternalBucket> iterator = new IteratorAndCurrent<>(timeSeries.buckets.iterator());
pq.add(iterator);
}
}

reduceContext.consumeBucketsAndMaybeBreak(bucketsList.size());
InternalTimeSeries reduced = new InternalTimeSeries(name, new ArrayList<>(bucketsList.size()), keyed, getMetadata());
for (Map.Entry<Map<String, Object>, List<InternalBucket>> bucketEntry : bucketsList.entrySet()) {
reduced.buckets.add(reduceBucket(bucketEntry.getValue(), reduceContext));
InternalTimeSeries reduced = new InternalTimeSeries(name, new ArrayList<>(initialCapacity), keyed, getMetadata());
List<InternalBucket> bucketsWithSameKey = new ArrayList<>(aggregations.size());
BytesRef prevTsid = null;
while (pq.size() > 0) {
reduceContext.consumeBucketsAndMaybeBreak(1);
bucketsWithSameKey.clear();

while (bucketsWithSameKey.isEmpty() || bucketsWithSameKey.get(0).key.equals(pq.top().current().key)) {
IteratorAndCurrent<InternalBucket> iterator = pq.top();
bucketsWithSameKey.add(iterator.current());
if (iterator.hasNext()) {
iterator.next();
pq.updateTop();
} else {
pq.pop();
if (pq.size() == 0) {
break;
}
}
}

InternalBucket reducedBucket;
if (bucketsWithSameKey.size() == 1) {
reducedBucket = bucketsWithSameKey.get(0);
} else {
reducedBucket = reduceBucket(bucketsWithSameKey, reduceContext);
}
BytesRef tsid = reducedBucket.key;
assert prevTsid == null || tsid.compareTo(prevTsid) > 0;
reduced.buckets.add(reducedBucket);
prevTsid = tsid;
}
return reduced;

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
Expand All @@ -32,7 +31,6 @@ public class TimeSeriesAggregator extends BucketsAggregator {
protected final BytesKeyedBucketOrds bucketOrds;
private final boolean keyed;

@SuppressWarnings("unchecked")
public TimeSeriesAggregator(
String name,
AggregatorFactories factories,
Expand All @@ -49,16 +47,19 @@ public TimeSeriesAggregator(

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
BytesRef spare = new BytesRef();
InternalTimeSeries.InternalBucket[][] allBucketsPerOrd = new InternalTimeSeries.InternalBucket[owningBucketOrds.length][];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
BytesRef spareKey = new BytesRef();
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
List<InternalTimeSeries.InternalBucket> buckets = new ArrayList<>();
BytesRef prev = null;
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
ordsEnum.readValue(spareKey);
ordsEnum.readValue(spare);
assert prev == null || spare.compareTo(prev) > 0
: "key [" + spare.utf8ToString() + "] is smaller than previous key [" + prev.utf8ToString() + "]";
InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket(
TimeSeriesIdFieldMapper.decodeTsid(spareKey),
prev = BytesRef.deepCopyOf(spare), // Closing bucketOrds will corrupt the bytes ref, so need to make a deep copy here.
docCount,
null,
keyed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,32 @@

package org.elasticsearch.aggregations.bucket.timeseries;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.aggregations.bucket.AggregationMultiBucketAggregationTestCase;
import org.elasticsearch.aggregations.bucket.timeseries.InternalTimeSeries.InternalBucket;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.xcontent.ContextParser;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Predicate;

import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.equalTo;

public class InternalTimeSeriesTests extends AggregationMultiBucketAggregationTestCase<InternalTimeSeries> {

Expand All @@ -29,14 +42,25 @@ protected Map.Entry<String, ContextParser<Object, Aggregation>> getParser() {
return Map.entry(TimeSeriesAggregationBuilder.NAME, (p, c) -> ParsedTimeSeries.fromXContent(p, (String) c));
}

private List<InternalTimeSeries.InternalBucket> randomBuckets(boolean keyed, InternalAggregations aggregations) {
private List<InternalBucket> randomBuckets(boolean keyed, InternalAggregations aggregations) {
int numberOfBuckets = randomNumberOfBuckets();
List<InternalTimeSeries.InternalBucket> bucketList = new ArrayList<>(numberOfBuckets);
List<InternalBucket> bucketList = new ArrayList<>(numberOfBuckets);
List<Map<String, Object>> keys = randomKeys(bucketKeys(randomIntBetween(1, 4)), numberOfBuckets);
for (int j = 0; j < numberOfBuckets; j++) {
long docCount = randomLongBetween(0, Long.MAX_VALUE / (20L * numberOfBuckets));
bucketList.add(new InternalTimeSeries.InternalBucket(keys.get(j), docCount, aggregations, keyed));
var builder = new TimeSeriesIdFieldMapper.TimeSeriesIdBuilder(null);
for (var entry : keys.get(j).entrySet()) {
builder.addString(entry.getKey(), (String) entry.getValue());
}
try {
var key = builder.build().toBytesRef();
bucketList.add(new InternalBucket(key, docCount, aggregations, keyed));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
// The interal time series' reduce method expects for each shard level response that the buckets are sorted by tsid:
bucketList.sort(Comparator.comparing(o -> o.key));
return bucketList;
}

Expand Down Expand Up @@ -68,7 +92,7 @@ protected InternalTimeSeries createTestInstance(String name, Map<String, Object>
protected void assertReduced(InternalTimeSeries reduced, List<InternalTimeSeries> inputs) {
Map<Map<String, Object>, Long> keys = new HashMap<>();
for (InternalTimeSeries in : inputs) {
for (InternalTimeSeries.InternalBucket bucket : in.getBuckets()) {
for (InternalBucket bucket : in.getBuckets()) {
keys.compute(bucket.getKey(), (k, v) -> {
if (v == null) {
return bucket.docCount;
Expand All @@ -79,7 +103,7 @@ protected void assertReduced(InternalTimeSeries reduced, List<InternalTimeSeries
}
}
assertThat(
reduced.getBuckets().stream().map(InternalTimeSeries.InternalBucket::getKey).toArray(Object[]::new),
reduced.getBuckets().stream().map(InternalBucket::getKey).toArray(Object[]::new),
arrayContainingInAnyOrder(keys.keySet().toArray(Object[]::new))
);
}
Expand All @@ -93,4 +117,58 @@ protected Class<ParsedTimeSeries> implementationClass() {
protected Predicate<String> excludePathsFromXContentInsertion() {
return s -> s.endsWith(".key");
}

public void testReduceSimple() {
// a simple test, to easily spot easy mistakes in the merge logic in InternalTimeSeries#reduce(...) method.
InternalTimeSeries first = new InternalTimeSeries(
"ts",
List.of(
new InternalBucket(new BytesRef("1"), 3, InternalAggregations.EMPTY, false),
new InternalBucket(new BytesRef("10"), 6, InternalAggregations.EMPTY, false),
new InternalBucket(new BytesRef("2"), 2, InternalAggregations.EMPTY, false),
new InternalBucket(new BytesRef("9"), 5, InternalAggregations.EMPTY, false)
),
false,
Map.of()
);
InternalTimeSeries second = new InternalTimeSeries(
"ts",
List.of(
new InternalBucket(new BytesRef("2"), 1, InternalAggregations.EMPTY, false),
new InternalBucket(new BytesRef("3"), 3, InternalAggregations.EMPTY, false)
),
false,
Map.of()
);
InternalTimeSeries third = new InternalTimeSeries(
"ts",
List.of(
new InternalBucket(new BytesRef("1"), 2, InternalAggregations.EMPTY, false),
new InternalBucket(new BytesRef("3"), 4, InternalAggregations.EMPTY, false),
new InternalBucket(new BytesRef("9"), 4, InternalAggregations.EMPTY, false)
),
false,
Map.of()
);
AggregationReduceContext context = new AggregationReduceContext.ForFinal(
new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()),
mockScriptService(),
() -> false,
new TimeSeriesAggregationBuilder("ts"),
value -> {},
PipelineAggregator.PipelineTree.EMPTY
);

InternalTimeSeries result = (InternalTimeSeries) first.reduce(List.of(first, second, third), context);
assertThat(result.getBuckets().get(0).key.utf8ToString(), equalTo("1"));
assertThat(result.getBuckets().get(0).getDocCount(), equalTo(5L));
assertThat(result.getBuckets().get(1).key.utf8ToString(), equalTo("10"));
assertThat(result.getBuckets().get(1).getDocCount(), equalTo(6L));
assertThat(result.getBuckets().get(2).key.utf8ToString(), equalTo("2"));
assertThat(result.getBuckets().get(2).getDocCount(), equalTo(3L));
assertThat(result.getBuckets().get(3).key.utf8ToString(), equalTo("3"));
assertThat(result.getBuckets().get(3).getDocCount(), equalTo(7L));
assertThat(result.getBuckets().get(4).key.utf8ToString(), equalTo("9"));
assertThat(result.getBuckets().get(4).getDocCount(), equalTo(9L));
}
}

0 comments on commit 3ece828

Please sign in to comment.