From f8dfc46411607c8e969fa2a5e591662b5c7facd1 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 8 Nov 2022 15:03:57 +0100 Subject: [PATCH 1/7] Change internal representation of bucket key of time_series agg Currently, the key is a map, which can make reducing large response more memory intense then it should be also. Also data structures used during reduce are not back by bigarrays so not accounted for. This commit changes how the key is represented internally. By using BytesRef instead of Map. This doesn't commit doesn't change how the key is represented in the response. It also changes the reduce method to make use of the bucket keys are now bytes refs. Relates to #74660 --- .../timeseries/InternalTimeSeries.java | 84 +++++++++++-------- .../timeseries/TimeSeriesAggregator.java | 5 +- .../timeseries/InternalTimeSeriesTests.java | 14 +++- 3 files changed, 65 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeries.java b/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeries.java index 3b15f3fe3c2c1..e0fa5bb5350c4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeries.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeries.java @@ -8,8 +8,13 @@ package org.elasticsearch.search.aggregations.timeseries; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.BytesRefHash; +import org.elasticsearch.common.util.LongObjectPagedHashMap; +import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.search.aggregations.AggregationReduceContext; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; @@ -46,11 +51,12 @@ public class InternalTimeSeries extends InternalMultiBucketAggregation key; + protected final BytesRef key; + // TODO: make computing docCount optional protected long docCount; protected InternalAggregations aggregations; - public InternalBucket(Map key, long docCount, InternalAggregations aggregations, boolean keyed) { + public InternalBucket(BytesRef key, long docCount, InternalAggregations aggregations, boolean keyed) { this.key = key; this.docCount = docCount; this.aggregations = aggregations; @@ -62,26 +68,26 @@ public InternalBucket(Map key, long docCount, InternalAggregatio */ public InternalBucket(StreamInput in, boolean keyed) throws IOException { this.keyed = keyed; - key = in.readOrderedMap(StreamInput::readString, StreamInput::readGenericValue); + key = in.readBytesRef(); docCount = in.readVLong(); aggregations = InternalAggregations.readFrom(in); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeMap(key, StreamOutput::writeString, StreamOutput::writeGenericValue); + out.writeBytesRef(key); out.writeVLong(docCount); aggregations.writeTo(out); } @Override public Map getKey() { - return key; + return TimeSeriesIdFieldMapper.decodeTsid(key); } @Override public String getKeyAsString() { - return key.toString(); + return getKey().toString(); } @Override @@ -96,8 +102,10 @@ public InternalAggregations getAggregations() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + // Use map key in the xcontent response: + var key = getKey(); if (keyed) { - builder.startObject(getKeyAsString()); + builder.startObject(key.toString()); } else { builder.startObject(); } @@ -186,38 +194,46 @@ protected void doWriteTo(StreamOutput out) throws IOException { @Override public InternalAggregation reduce(List aggregations, AggregationReduceContext reduceContext) { - // We still need to reduce in case we got the same time series in 2 different indices, but we should be able to optimize - // that in the future - Map, List> bucketsList = null; - for (InternalAggregation aggregation : aggregations) { - InternalTimeSeries timeSeries = (InternalTimeSeries) aggregation; - if (bucketsList != null) { - for (InternalBucket bucket : timeSeries.buckets) { - bucketsList.compute(bucket.key, (map, list) -> { - if (list == null) { - list = new ArrayList<>(); + // TODO: optimize single result case either by having a if check here and return aggregations.get(0) or + // by overwriting the mustReduceOnSingleInternalAgg() method + + final BigArrays bigArrays = reduceContext.bigArrays(); + final int initialCapacity = aggregations.stream() + .map(value -> (InternalTimeSeries) value) + .mapToInt(value -> value.getBuckets().size()) + .max() + .getAsInt(); + try (LongObjectPagedHashMap> tsKeyToBuckets = new LongObjectPagedHashMap<>(initialCapacity, bigArrays)) { + final int numTsids; + // We still need to reduce in case we got the same time series in 2 different indices, but we should be able to optimize + // that in the future + try (BytesRefHash tsids = new BytesRefHash(initialCapacity, bigArrays)) { + for (int i = 0; i < aggregations.size(); i++) { + InternalTimeSeries timeSeries = (InternalTimeSeries) aggregations.get(i); + for (int j = 0; j < timeSeries.getBuckets().size(); j++) { + InternalBucket bucket = timeSeries.getBuckets().get(j); + long key = tsids.add(bucket.key); + List buckets; + if (key < 0) { + key = -1 - key; + buckets = tsKeyToBuckets.get(key); + } else { + buckets = new ArrayList<>(); } - list.add(bucket); - return list; - }); - } - } else { - bucketsList = new HashMap<>(timeSeries.buckets.size()); - for (InternalTimeSeries.InternalBucket bucket : timeSeries.buckets) { - List bucketList = new ArrayList<>(); - bucketList.add(bucket); - bucketsList.put(bucket.key, bucketList); + buckets.add(bucket); + tsKeyToBuckets.put(key, buckets); + } } + numTsids = (int) tsids.size(); } - } - reduceContext.consumeBucketsAndMaybeBreak(bucketsList.size()); - InternalTimeSeries reduced = new InternalTimeSeries(name, new ArrayList<>(bucketsList.size()), keyed, getMetadata()); - for (Map.Entry, List> bucketEntry : bucketsList.entrySet()) { - reduced.buckets.add(reduceBucket(bucketEntry.getValue(), reduceContext)); + reduceContext.consumeBucketsAndMaybeBreak(numTsids); + InternalTimeSeries reduced = new InternalTimeSeries(name, new ArrayList<>(numTsids), keyed, getMetadata()); + for (LongObjectPagedHashMap.Cursor> tsKeyToBucket : tsKeyToBuckets) { + reduced.buckets.add(reduceBucket(tsKeyToBucket.value, reduceContext)); + } + return reduced; } - return reduced; - } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregator.java index 8a077b100e1ea..f225225d25aec 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregator.java @@ -10,7 +10,6 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.core.Releasables; -import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; @@ -51,14 +50,14 @@ public TimeSeriesAggregator( public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { InternalTimeSeries.InternalBucket[][] allBucketsPerOrd = new InternalTimeSeries.InternalBucket[owningBucketOrds.length][]; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - BytesRef spareKey = new BytesRef(); BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); List buckets = new ArrayList<>(); while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); + BytesRef spareKey = new BytesRef(); ordsEnum.readValue(spareKey); InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket( - TimeSeriesIdFieldMapper.decodeTsid(spareKey), + spareKey, docCount, null, keyed diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeriesTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeriesTests.java index 5f374ca9c7425..200e18505d667 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeriesTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeriesTests.java @@ -8,9 +8,12 @@ package org.elasticsearch.search.aggregations.timeseries; +import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.test.InternalMultiBucketAggregationTestCase; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -28,7 +31,16 @@ private List randomBuckets(boolean keyed, Int List> keys = randomKeys(bucketKeys(randomIntBetween(1, 4)), numberOfBuckets); for (int j = 0; j < numberOfBuckets; j++) { long docCount = randomLongBetween(0, Long.MAX_VALUE / (20L * numberOfBuckets)); - bucketList.add(new InternalTimeSeries.InternalBucket(keys.get(j), docCount, aggregations, keyed)); + var builder = new TimeSeriesIdFieldMapper.TimeSeriesIdBuilder(null); + for (var entry : keys.get(j).entrySet()) { + builder.addString(entry.getKey(), (String) entry.getValue()); + } + try { + var key = builder.build().toBytesRef(); + bucketList.add(new InternalTimeSeries.InternalBucket(key, docCount, aggregations, keyed)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } return bucketList; } From a845117d57de215fde37f0abccb9a9d46bd12d5d Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 8 Nov 2022 17:12:14 +0100 Subject: [PATCH 2/7] rename variable --- .../aggregations/timeseries/TimeSeriesAggregator.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregator.java index f225225d25aec..0473c46e95018 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregator.java @@ -54,14 +54,9 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I List buckets = new ArrayList<>(); while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); - BytesRef spareKey = new BytesRef(); - ordsEnum.readValue(spareKey); - InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket( - spareKey, - docCount, - null, - keyed - ); + BytesRef key = new BytesRef(); + ordsEnum.readValue(key); + InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket(key, docCount, null, keyed); bucket.bucketOrd = ordsEnum.ord(); buckets.add(bucket); } From e92af1161536a2591b88d6acd823aa1ddc9c3666 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 9 Nov 2022 10:31:46 +0100 Subject: [PATCH 3/7] improve reduce logic and make use of the fact that buckets are sorted in tsid order in the shard level responses. This allows for merging and detecting same TSIDs in multiple shard responses without deduping tsids first in a data structure. --- .../bucket/timeseries/InternalTimeSeries.java | 74 ++++++++++-------- .../timeseries/InternalTimeSeriesTests.java | 75 +++++++++++++++++-- 2 files changed, 111 insertions(+), 38 deletions(-) diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java index e4ff5229590a7..1cb2a235a2779 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java @@ -11,14 +11,12 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.BytesRefHash; -import org.elasticsearch.common.util.LongObjectPagedHashMap; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.search.aggregations.AggregationReduceContext; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent; import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.XContentBuilder; @@ -197,44 +195,56 @@ protected void doWriteTo(StreamOutput out) throws IOException { public InternalAggregation reduce(List aggregations, AggregationReduceContext reduceContext) { // TODO: optimize single result case either by having a if check here and return aggregations.get(0) or // by overwriting the mustReduceOnSingleInternalAgg() method - - final BigArrays bigArrays = reduceContext.bigArrays(); final int initialCapacity = aggregations.stream() .map(value -> (InternalTimeSeries) value) .mapToInt(value -> value.getBuckets().size()) .max() .getAsInt(); - try (LongObjectPagedHashMap> tsKeyToBuckets = new LongObjectPagedHashMap<>(initialCapacity, bigArrays)) { - final int numTsids; - // We still need to reduce in case we got the same time series in 2 different indices, but we should be able to optimize - // that in the future - try (BytesRefHash tsids = new BytesRefHash(initialCapacity, bigArrays)) { - for (int i = 0; i < aggregations.size(); i++) { - InternalTimeSeries timeSeries = (InternalTimeSeries) aggregations.get(i); - for (int j = 0; j < timeSeries.getBuckets().size(); j++) { - InternalBucket bucket = timeSeries.getBuckets().get(j); - long key = tsids.add(bucket.key); - List buckets; - if (key < 0) { - key = -1 - key; - buckets = tsKeyToBuckets.get(key); - } else { - buckets = new ArrayList<>(); - } - buckets.add(bucket); - tsKeyToBuckets.put(key, buckets); - } - } - numTsids = (int) tsids.size(); + + final List> iterators = new ArrayList<>(aggregations.size()); + for (InternalAggregation aggregation : aggregations) { + InternalTimeSeries timeSeries = (InternalTimeSeries) aggregation; + if (timeSeries.buckets.isEmpty() == false) { + IteratorAndCurrent iterator = new IteratorAndCurrent<>(timeSeries.buckets.iterator()); + iterators.add(iterator); } + } - reduceContext.consumeBucketsAndMaybeBreak(numTsids); - InternalTimeSeries reduced = new InternalTimeSeries(name, new ArrayList<>(numTsids), keyed, getMetadata()); - for (LongObjectPagedHashMap.Cursor> tsKeyToBucket : tsKeyToBuckets) { - reduced.buckets.add(reduceBucket(tsKeyToBucket.value, reduceContext)); + InternalTimeSeries reduced = new InternalTimeSeries(name, new ArrayList<>(initialCapacity), keyed, getMetadata()); + List> competitiveIterators = new ArrayList<>(iterators.size()); + while (iterators.isEmpty() == false) { + reduceContext.consumeBucketsAndMaybeBreak(1); + IteratorAndCurrent competitive = iterators.get(0); + competitiveIterators.clear(); + competitiveIterators.add(competitive); + for (int i = 1; i < iterators.size(); i++) { + IteratorAndCurrent contender = iterators.get(i); + int cmp = contender.current().key.compareTo(competitive.current().key); + if (cmp < 0) { + competitive = contender; + competitiveIterators.clear(); + competitiveIterators.add(contender); + } else if (cmp == 0) { + competitiveIterators.add(contender); + } + } + InternalBucket reducedBucket; + if (competitiveIterators.size() == 1) { + reducedBucket = competitive.current(); + } else { + List buckets = competitiveIterators.stream().map(IteratorAndCurrent::current).toList(); + reducedBucket = reduceBucket(buckets, reduceContext); + } + reduced.buckets.add(reducedBucket); + for (IteratorAndCurrent iterator : competitiveIterators) { + if (iterator.hasNext()) { + iterator.next(); + } else { + iterators.remove(iterator); + } } - return reduced; } + return reduced; } @Override diff --git a/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeriesTests.java b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeriesTests.java index 9c1e72b56e1c4..7c4621b06a328 100644 --- a/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeriesTests.java +++ b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeriesTests.java @@ -8,10 +8,18 @@ package org.elasticsearch.aggregations.bucket.timeseries; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.aggregations.bucket.AggregationMultiBucketAggregationTestCase; -import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.aggregations.bucket.timeseries.InternalTimeSeries.InternalBucket; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.AggregationReduceContext; import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.xcontent.ContextParser; import java.io.IOException; @@ -24,6 +32,7 @@ import java.util.function.Predicate; import static org.hamcrest.Matchers.arrayContainingInAnyOrder; +import static org.hamcrest.Matchers.equalTo; public class InternalTimeSeriesTests extends AggregationMultiBucketAggregationTestCase { @@ -32,9 +41,9 @@ protected Map.Entry> getParser() { return Map.entry(TimeSeriesAggregationBuilder.NAME, (p, c) -> ParsedTimeSeries.fromXContent(p, (String) c)); } - private List randomBuckets(boolean keyed, InternalAggregations aggregations) { + private List randomBuckets(boolean keyed, InternalAggregations aggregations) { int numberOfBuckets = randomNumberOfBuckets(); - List bucketList = new ArrayList<>(numberOfBuckets); + List bucketList = new ArrayList<>(numberOfBuckets); List> keys = randomKeys(bucketKeys(randomIntBetween(1, 4)), numberOfBuckets); for (int j = 0; j < numberOfBuckets; j++) { long docCount = randomLongBetween(0, Long.MAX_VALUE / (20L * numberOfBuckets)); @@ -44,7 +53,7 @@ private List randomBuckets(boolean keyed, Int } try { var key = builder.build().toBytesRef(); - bucketList.add(new InternalTimeSeries.InternalBucket(key, docCount, aggregations, keyed)); + bucketList.add(new InternalBucket(key, docCount, aggregations, keyed)); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -80,7 +89,7 @@ protected InternalTimeSeries createTestInstance(String name, Map protected void assertReduced(InternalTimeSeries reduced, List inputs) { Map, Long> keys = new HashMap<>(); for (InternalTimeSeries in : inputs) { - for (InternalTimeSeries.InternalBucket bucket : in.getBuckets()) { + for (InternalBucket bucket : in.getBuckets()) { keys.compute(bucket.getKey(), (k, v) -> { if (v == null) { return bucket.docCount; @@ -91,7 +100,7 @@ protected void assertReduced(InternalTimeSeries reduced, List implementationClass() { protected Predicate excludePathsFromXContentInsertion() { return s -> s.endsWith(".key"); } + + public void testReduceSimple() { + // a simple test, to easily spot easy mistakes in the merge logic in InternalTimeSeries#reduce(...) method. + InternalTimeSeries first = new InternalTimeSeries( + "ts", + List.of( + new InternalBucket(new BytesRef("1"), 3, InternalAggregations.EMPTY, false), + new InternalBucket(new BytesRef("2"), 2, InternalAggregations.EMPTY, false), + new InternalBucket(new BytesRef("9"), 5, InternalAggregations.EMPTY, false), + new InternalBucket(new BytesRef("10"), 6, InternalAggregations.EMPTY, false) + ), + false, + Map.of() + ); + InternalTimeSeries second = new InternalTimeSeries( + "ts", + List.of( + new InternalBucket(new BytesRef("2"), 1, InternalAggregations.EMPTY, false), + new InternalBucket(new BytesRef("3"), 3, InternalAggregations.EMPTY, false) + ), + false, + Map.of() + ); + InternalTimeSeries third = new InternalTimeSeries( + "ts", + List.of( + new InternalBucket(new BytesRef("1"), 2, InternalAggregations.EMPTY, false), + new InternalBucket(new BytesRef("3"), 4, InternalAggregations.EMPTY, false), + new InternalBucket(new BytesRef("9"), 4, InternalAggregations.EMPTY, false) + ), + false, + Map.of() + ); + AggregationReduceContext context = new AggregationReduceContext.ForFinal( + new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()), + mockScriptService(), + () -> false, + new TimeSeriesAggregationBuilder("ts"), + value -> {}, + PipelineAggregator.PipelineTree.EMPTY + ); + + InternalTimeSeries result = (InternalTimeSeries) first.reduce(List.of(first, second, third), context); + assertThat(result.getBuckets().get(0).key.utf8ToString(), equalTo("1")); + assertThat(result.getBuckets().get(0).getDocCount(), equalTo(5L)); + assertThat(result.getBuckets().get(1).key.utf8ToString(), equalTo("2")); + assertThat(result.getBuckets().get(1).getDocCount(), equalTo(3L)); + assertThat(result.getBuckets().get(2).key.utf8ToString(), equalTo("3")); + assertThat(result.getBuckets().get(2).getDocCount(), equalTo(7L)); + assertThat(result.getBuckets().get(3).key.utf8ToString(), equalTo("9")); + assertThat(result.getBuckets().get(3).getDocCount(), equalTo(9L)); + assertThat(result.getBuckets().get(4).key.utf8ToString(), equalTo("10")); + assertThat(result.getBuckets().get(4).getDocCount(), equalTo(6L)); + } } From 0d20460c6a4d7b3bab9d288add6a80fff2343d73 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 9 Nov 2022 15:16:25 +0100 Subject: [PATCH 4/7] added assertions when building shard level agg response that buckets are sorted by tsid and during the reduce that the tsids are sorted by tsid as well. --- .../bucket/timeseries/InternalTimeSeries.java | 4 ++++ .../timeseries/TimeSeriesAggregator.java | 5 +++- .../timeseries/InternalTimeSeriesTests.java | 23 +++++++++++-------- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java index 1cb2a235a2779..11c730f943842 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java @@ -212,6 +212,7 @@ public InternalAggregation reduce(List aggregations, Aggreg InternalTimeSeries reduced = new InternalTimeSeries(name, new ArrayList<>(initialCapacity), keyed, getMetadata()); List> competitiveIterators = new ArrayList<>(iterators.size()); + BytesRef prevTsid = null; while (iterators.isEmpty() == false) { reduceContext.consumeBucketsAndMaybeBreak(1); IteratorAndCurrent competitive = iterators.get(0); @@ -235,6 +236,8 @@ public InternalAggregation reduce(List aggregations, Aggreg List buckets = competitiveIterators.stream().map(IteratorAndCurrent::current).toList(); reducedBucket = reduceBucket(buckets, reduceContext); } + BytesRef tsid = reducedBucket.key; + assert prevTsid == null || tsid.compareTo(prevTsid) > 0; reduced.buckets.add(reducedBucket); for (IteratorAndCurrent iterator : competitiveIterators) { if (iterator.hasNext()) { @@ -243,6 +246,7 @@ public InternalAggregation reduce(List aggregations, Aggreg iterators.remove(iterator); } } + prevTsid = tsid; } return reduced; } diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java index af1980a8be15b..3fe2294f2f07e 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java @@ -31,7 +31,6 @@ public class TimeSeriesAggregator extends BucketsAggregator { protected final BytesKeyedBucketOrds bucketOrds; private final boolean keyed; - @SuppressWarnings("unchecked") public TimeSeriesAggregator( String name, AggregatorFactories factories, @@ -52,13 +51,17 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); List buckets = new ArrayList<>(); + BytesRef prev = null; while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); BytesRef key = new BytesRef(); ordsEnum.readValue(key); + assert prev == null || key.compareTo(prev) > 0 + : "key [" + key.utf8ToString() + "] is smaller than previous key [" + prev.utf8ToString() + "]"; InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket(key, docCount, null, keyed); bucket.bucketOrd = ordsEnum.ord(); buckets.add(bucket); + prev = key; } allBucketsPerOrd[ordIdx] = buckets.toArray(new InternalTimeSeries.InternalBucket[0]); } diff --git a/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeriesTests.java b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeriesTests.java index 7c4621b06a328..943e7f43c7543 100644 --- a/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeriesTests.java +++ b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeriesTests.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -58,6 +59,8 @@ private List randomBuckets(boolean keyed, InternalAggregations a throw new UncheckedIOException(e); } } + // The interal time series' reduce method expects for each shard level response that the buckets are sorted by tsid: + bucketList.sort(Comparator.comparing(o -> o.key)); return bucketList; } @@ -121,9 +124,9 @@ public void testReduceSimple() { "ts", List.of( new InternalBucket(new BytesRef("1"), 3, InternalAggregations.EMPTY, false), + new InternalBucket(new BytesRef("10"), 6, InternalAggregations.EMPTY, false), new InternalBucket(new BytesRef("2"), 2, InternalAggregations.EMPTY, false), - new InternalBucket(new BytesRef("9"), 5, InternalAggregations.EMPTY, false), - new InternalBucket(new BytesRef("10"), 6, InternalAggregations.EMPTY, false) + new InternalBucket(new BytesRef("9"), 5, InternalAggregations.EMPTY, false) ), false, Map.of() @@ -159,13 +162,13 @@ public void testReduceSimple() { InternalTimeSeries result = (InternalTimeSeries) first.reduce(List.of(first, second, third), context); assertThat(result.getBuckets().get(0).key.utf8ToString(), equalTo("1")); assertThat(result.getBuckets().get(0).getDocCount(), equalTo(5L)); - assertThat(result.getBuckets().get(1).key.utf8ToString(), equalTo("2")); - assertThat(result.getBuckets().get(1).getDocCount(), equalTo(3L)); - assertThat(result.getBuckets().get(2).key.utf8ToString(), equalTo("3")); - assertThat(result.getBuckets().get(2).getDocCount(), equalTo(7L)); - assertThat(result.getBuckets().get(3).key.utf8ToString(), equalTo("9")); - assertThat(result.getBuckets().get(3).getDocCount(), equalTo(9L)); - assertThat(result.getBuckets().get(4).key.utf8ToString(), equalTo("10")); - assertThat(result.getBuckets().get(4).getDocCount(), equalTo(6L)); + assertThat(result.getBuckets().get(1).key.utf8ToString(), equalTo("10")); + assertThat(result.getBuckets().get(1).getDocCount(), equalTo(6L)); + assertThat(result.getBuckets().get(2).key.utf8ToString(), equalTo("2")); + assertThat(result.getBuckets().get(2).getDocCount(), equalTo(3L)); + assertThat(result.getBuckets().get(3).key.utf8ToString(), equalTo("3")); + assertThat(result.getBuckets().get(3).getDocCount(), equalTo(7L)); + assertThat(result.getBuckets().get(4).key.utf8ToString(), equalTo("9")); + assertThat(result.getBuckets().get(4).getDocCount(), equalTo(9L)); } } From c45caecdecd8da4061a2ce27960121d6fe88dec3 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 10 Nov 2022 08:53:47 +0100 Subject: [PATCH 5/7] use priority queue --- .../bucket/timeseries/InternalTimeSeries.java | 54 +++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java index 11c730f943842..4a7f2d3170b15 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java @@ -9,6 +9,7 @@ package org.elasticsearch.aggregations.bucket.timeseries; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.PriorityQueue; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; @@ -201,51 +202,50 @@ public InternalAggregation reduce(List aggregations, Aggreg .max() .getAsInt(); - final List> iterators = new ArrayList<>(aggregations.size()); + final PriorityQueue> pq = new PriorityQueue<>(aggregations.size()) { + @Override + protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { + return a.current().key.compareTo(b.current().key) < 0; + } + }; for (InternalAggregation aggregation : aggregations) { InternalTimeSeries timeSeries = (InternalTimeSeries) aggregation; if (timeSeries.buckets.isEmpty() == false) { IteratorAndCurrent iterator = new IteratorAndCurrent<>(timeSeries.buckets.iterator()); - iterators.add(iterator); + pq.add(iterator); } } InternalTimeSeries reduced = new InternalTimeSeries(name, new ArrayList<>(initialCapacity), keyed, getMetadata()); - List> competitiveIterators = new ArrayList<>(iterators.size()); + List bucketsWithSameKey = new ArrayList<>(aggregations.size()); BytesRef prevTsid = null; - while (iterators.isEmpty() == false) { + while (pq.size() > 0) { reduceContext.consumeBucketsAndMaybeBreak(1); - IteratorAndCurrent competitive = iterators.get(0); - competitiveIterators.clear(); - competitiveIterators.add(competitive); - for (int i = 1; i < iterators.size(); i++) { - IteratorAndCurrent contender = iterators.get(i); - int cmp = contender.current().key.compareTo(competitive.current().key); - if (cmp < 0) { - competitive = contender; - competitiveIterators.clear(); - competitiveIterators.add(contender); - } else if (cmp == 0) { - competitiveIterators.add(contender); + bucketsWithSameKey.clear(); + + while (bucketsWithSameKey.isEmpty() || bucketsWithSameKey.get(0).key.equals(pq.top().current().key)) { + IteratorAndCurrent iterator = pq.top(); + bucketsWithSameKey.add(iterator.current()); + if (iterator.hasNext()) { + iterator.next(); + } else { + pq.pop(); + if (pq.size() == 0) { + break; + } } + pq.updateTop(); } + InternalBucket reducedBucket; - if (competitiveIterators.size() == 1) { - reducedBucket = competitive.current(); + if (bucketsWithSameKey.size() == 1) { + reducedBucket = bucketsWithSameKey.get(0); } else { - List buckets = competitiveIterators.stream().map(IteratorAndCurrent::current).toList(); - reducedBucket = reduceBucket(buckets, reduceContext); + reducedBucket = reduceBucket(bucketsWithSameKey, reduceContext); } BytesRef tsid = reducedBucket.key; assert prevTsid == null || tsid.compareTo(prevTsid) > 0; reduced.buckets.add(reducedBucket); - for (IteratorAndCurrent iterator : competitiveIterators) { - if (iterator.hasNext()) { - iterator.next(); - } else { - iterators.remove(iterator); - } - } prevTsid = tsid; } return reduced; From dcbb71cf624d0f4ed433486057b91a6d1da91e4c Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 10 Nov 2022 08:54:44 +0100 Subject: [PATCH 6/7] make a deep copy of bytes ref, otherwise closing bucketOrds may corrupt the bucket keys --- .../bucket/timeseries/TimeSeriesAggregator.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java index 3fe2294f2f07e..2d1e451839865 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java @@ -47,6 +47,7 @@ public TimeSeriesAggregator( @Override public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + BytesRef spare = new BytesRef(); InternalTimeSeries.InternalBucket[][] allBucketsPerOrd = new InternalTimeSeries.InternalBucket[owningBucketOrds.length][]; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); @@ -54,14 +55,17 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I BytesRef prev = null; while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); - BytesRef key = new BytesRef(); - ordsEnum.readValue(key); - assert prev == null || key.compareTo(prev) > 0 - : "key [" + key.utf8ToString() + "] is smaller than previous key [" + prev.utf8ToString() + "]"; - InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket(key, docCount, null, keyed); + ordsEnum.readValue(spare); + assert prev == null || spare.compareTo(prev) > 0 + : "key [" + spare.utf8ToString() + "] is smaller than previous key [" + prev.utf8ToString() + "]"; + InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket( + prev = BytesRef.deepCopyOf(spare), // Closing bucketOrds will corrupt the bytes ref, so need to make a deep copy here. + docCount, + null, + keyed + ); bucket.bucketOrd = ordsEnum.ord(); buckets.add(bucket); - prev = key; } allBucketsPerOrd[ordIdx] = buckets.toArray(new InternalTimeSeries.InternalBucket[0]); } From 10c4339e88983fded4f586dcdf01c0669f6e6147 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 10 Nov 2022 13:13:53 +0100 Subject: [PATCH 7/7] only update top if iterator advances, if pq is popped then update top isn't needed. --- .../aggregations/bucket/timeseries/InternalTimeSeries.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java index 4a7f2d3170b15..088568fefc030 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java @@ -228,13 +228,13 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurr bucketsWithSameKey.add(iterator.current()); if (iterator.hasNext()) { iterator.next(); + pq.updateTop(); } else { pq.pop(); if (pq.size() == 0) { break; } } - pq.updateTop(); } InternalBucket reducedBucket;