diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java index a4ad967b79602..088568fefc030 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java @@ -8,12 +8,16 @@ package org.elasticsearch.aggregations.bucket.timeseries; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.PriorityQueue; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.search.aggregations.AggregationReduceContext; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent; import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.XContentBuilder; @@ -47,11 +51,12 @@ public class InternalTimeSeries extends InternalMultiBucketAggregation key; + protected final BytesRef key; + // TODO: make computing docCount optional protected long docCount; protected InternalAggregations aggregations; - public InternalBucket(Map key, long docCount, InternalAggregations aggregations, boolean keyed) { + public InternalBucket(BytesRef key, long docCount, InternalAggregations aggregations, boolean keyed) { this.key = key; this.docCount = docCount; this.aggregations = aggregations; @@ -63,26 +68,26 @@ public InternalBucket(Map key, long docCount, InternalAggregatio */ public InternalBucket(StreamInput in, boolean keyed) throws IOException { this.keyed = keyed; - key = in.readOrderedMap(StreamInput::readString, StreamInput::readGenericValue); + key = in.readBytesRef(); docCount = in.readVLong(); aggregations = InternalAggregations.readFrom(in); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeMap(key, StreamOutput::writeString, StreamOutput::writeGenericValue); + out.writeBytesRef(key); out.writeVLong(docCount); aggregations.writeTo(out); } @Override public Map getKey() { - return key; + return TimeSeriesIdFieldMapper.decodeTsid(key); } @Override public String getKeyAsString() { - return key.toString(); + return getKey().toString(); } @Override @@ -97,8 +102,10 @@ public InternalAggregations getAggregations() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + // Use map key in the xcontent response: + var key = getKey(); if (keyed) { - builder.startObject(getKeyAsString()); + builder.startObject(key.toString()); } else { builder.startObject(); } @@ -187,38 +194,61 @@ protected void doWriteTo(StreamOutput out) throws IOException { @Override public InternalAggregation reduce(List aggregations, AggregationReduceContext reduceContext) { - // We still need to reduce in case we got the same time series in 2 different indices, but we should be able to optimize - // that in the future - Map, List> bucketsList = null; + // TODO: optimize single result case either by having a if check here and return aggregations.get(0) or + // by overwriting the mustReduceOnSingleInternalAgg() method + final int initialCapacity = aggregations.stream() + .map(value -> (InternalTimeSeries) value) + .mapToInt(value -> value.getBuckets().size()) + .max() + .getAsInt(); + + final PriorityQueue> pq = new PriorityQueue<>(aggregations.size()) { + @Override + protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { + return a.current().key.compareTo(b.current().key) < 0; + } + }; for (InternalAggregation aggregation : aggregations) { InternalTimeSeries timeSeries = (InternalTimeSeries) aggregation; - if (bucketsList != null) { - for (InternalBucket bucket : timeSeries.buckets) { - bucketsList.compute(bucket.key, (map, list) -> { - if (list == null) { - list = new ArrayList<>(); - } - list.add(bucket); - return list; - }); - } - } else { - bucketsList = new HashMap<>(timeSeries.buckets.size()); - for (InternalTimeSeries.InternalBucket bucket : timeSeries.buckets) { - List bucketList = new ArrayList<>(); - bucketList.add(bucket); - bucketsList.put(bucket.key, bucketList); - } + if (timeSeries.buckets.isEmpty() == false) { + IteratorAndCurrent iterator = new IteratorAndCurrent<>(timeSeries.buckets.iterator()); + pq.add(iterator); } } - reduceContext.consumeBucketsAndMaybeBreak(bucketsList.size()); - InternalTimeSeries reduced = new InternalTimeSeries(name, new ArrayList<>(bucketsList.size()), keyed, getMetadata()); - for (Map.Entry, List> bucketEntry : bucketsList.entrySet()) { - reduced.buckets.add(reduceBucket(bucketEntry.getValue(), reduceContext)); + InternalTimeSeries reduced = new InternalTimeSeries(name, new ArrayList<>(initialCapacity), keyed, getMetadata()); + List bucketsWithSameKey = new ArrayList<>(aggregations.size()); + BytesRef prevTsid = null; + while (pq.size() > 0) { + reduceContext.consumeBucketsAndMaybeBreak(1); + bucketsWithSameKey.clear(); + + while (bucketsWithSameKey.isEmpty() || bucketsWithSameKey.get(0).key.equals(pq.top().current().key)) { + IteratorAndCurrent iterator = pq.top(); + bucketsWithSameKey.add(iterator.current()); + if (iterator.hasNext()) { + iterator.next(); + pq.updateTop(); + } else { + pq.pop(); + if (pq.size() == 0) { + break; + } + } + } + + InternalBucket reducedBucket; + if (bucketsWithSameKey.size() == 1) { + reducedBucket = bucketsWithSameKey.get(0); + } else { + reducedBucket = reduceBucket(bucketsWithSameKey, reduceContext); + } + BytesRef tsid = reducedBucket.key; + assert prevTsid == null || tsid.compareTo(prevTsid) > 0; + reduced.buckets.add(reducedBucket); + prevTsid = tsid; } return reduced; - } @Override diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java index d6764dd97af19..2d1e451839865 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java @@ -10,7 +10,6 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.core.Releasables; -import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; @@ -32,7 +31,6 @@ public class TimeSeriesAggregator extends BucketsAggregator { protected final BytesKeyedBucketOrds bucketOrds; private final boolean keyed; - @SuppressWarnings("unchecked") public TimeSeriesAggregator( String name, AggregatorFactories factories, @@ -49,16 +47,19 @@ public TimeSeriesAggregator( @Override public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + BytesRef spare = new BytesRef(); InternalTimeSeries.InternalBucket[][] allBucketsPerOrd = new InternalTimeSeries.InternalBucket[owningBucketOrds.length][]; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - BytesRef spareKey = new BytesRef(); BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); List buckets = new ArrayList<>(); + BytesRef prev = null; while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); - ordsEnum.readValue(spareKey); + ordsEnum.readValue(spare); + assert prev == null || spare.compareTo(prev) > 0 + : "key [" + spare.utf8ToString() + "] is smaller than previous key [" + prev.utf8ToString() + "]"; InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket( - TimeSeriesIdFieldMapper.decodeTsid(spareKey), + prev = BytesRef.deepCopyOf(spare), // Closing bucketOrds will corrupt the bytes ref, so need to make a deep copy here. docCount, null, keyed diff --git a/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeriesTests.java b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeriesTests.java index 38cccb969c8bc..943e7f43c7543 100644 --- a/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeriesTests.java +++ b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeriesTests.java @@ -8,12 +8,24 @@ package org.elasticsearch.aggregations.bucket.timeseries; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.aggregations.bucket.AggregationMultiBucketAggregationTestCase; +import org.elasticsearch.aggregations.bucket.timeseries.InternalTimeSeries.InternalBucket; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.AggregationReduceContext; import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.xcontent.ContextParser; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -21,6 +33,7 @@ import java.util.function.Predicate; import static org.hamcrest.Matchers.arrayContainingInAnyOrder; +import static org.hamcrest.Matchers.equalTo; public class InternalTimeSeriesTests extends AggregationMultiBucketAggregationTestCase { @@ -29,14 +42,25 @@ protected Map.Entry> getParser() { return Map.entry(TimeSeriesAggregationBuilder.NAME, (p, c) -> ParsedTimeSeries.fromXContent(p, (String) c)); } - private List randomBuckets(boolean keyed, InternalAggregations aggregations) { + private List randomBuckets(boolean keyed, InternalAggregations aggregations) { int numberOfBuckets = randomNumberOfBuckets(); - List bucketList = new ArrayList<>(numberOfBuckets); + List bucketList = new ArrayList<>(numberOfBuckets); List> keys = randomKeys(bucketKeys(randomIntBetween(1, 4)), numberOfBuckets); for (int j = 0; j < numberOfBuckets; j++) { long docCount = randomLongBetween(0, Long.MAX_VALUE / (20L * numberOfBuckets)); - bucketList.add(new InternalTimeSeries.InternalBucket(keys.get(j), docCount, aggregations, keyed)); + var builder = new TimeSeriesIdFieldMapper.TimeSeriesIdBuilder(null); + for (var entry : keys.get(j).entrySet()) { + builder.addString(entry.getKey(), (String) entry.getValue()); + } + try { + var key = builder.build().toBytesRef(); + bucketList.add(new InternalBucket(key, docCount, aggregations, keyed)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } + // The interal time series' reduce method expects for each shard level response that the buckets are sorted by tsid: + bucketList.sort(Comparator.comparing(o -> o.key)); return bucketList; } @@ -68,7 +92,7 @@ protected InternalTimeSeries createTestInstance(String name, Map protected void assertReduced(InternalTimeSeries reduced, List inputs) { Map, Long> keys = new HashMap<>(); for (InternalTimeSeries in : inputs) { - for (InternalTimeSeries.InternalBucket bucket : in.getBuckets()) { + for (InternalBucket bucket : in.getBuckets()) { keys.compute(bucket.getKey(), (k, v) -> { if (v == null) { return bucket.docCount; @@ -79,7 +103,7 @@ protected void assertReduced(InternalTimeSeries reduced, List implementationClass() { protected Predicate excludePathsFromXContentInsertion() { return s -> s.endsWith(".key"); } + + public void testReduceSimple() { + // a simple test, to easily spot easy mistakes in the merge logic in InternalTimeSeries#reduce(...) method. + InternalTimeSeries first = new InternalTimeSeries( + "ts", + List.of( + new InternalBucket(new BytesRef("1"), 3, InternalAggregations.EMPTY, false), + new InternalBucket(new BytesRef("10"), 6, InternalAggregations.EMPTY, false), + new InternalBucket(new BytesRef("2"), 2, InternalAggregations.EMPTY, false), + new InternalBucket(new BytesRef("9"), 5, InternalAggregations.EMPTY, false) + ), + false, + Map.of() + ); + InternalTimeSeries second = new InternalTimeSeries( + "ts", + List.of( + new InternalBucket(new BytesRef("2"), 1, InternalAggregations.EMPTY, false), + new InternalBucket(new BytesRef("3"), 3, InternalAggregations.EMPTY, false) + ), + false, + Map.of() + ); + InternalTimeSeries third = new InternalTimeSeries( + "ts", + List.of( + new InternalBucket(new BytesRef("1"), 2, InternalAggregations.EMPTY, false), + new InternalBucket(new BytesRef("3"), 4, InternalAggregations.EMPTY, false), + new InternalBucket(new BytesRef("9"), 4, InternalAggregations.EMPTY, false) + ), + false, + Map.of() + ); + AggregationReduceContext context = new AggregationReduceContext.ForFinal( + new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()), + mockScriptService(), + () -> false, + new TimeSeriesAggregationBuilder("ts"), + value -> {}, + PipelineAggregator.PipelineTree.EMPTY + ); + + InternalTimeSeries result = (InternalTimeSeries) first.reduce(List.of(first, second, third), context); + assertThat(result.getBuckets().get(0).key.utf8ToString(), equalTo("1")); + assertThat(result.getBuckets().get(0).getDocCount(), equalTo(5L)); + assertThat(result.getBuckets().get(1).key.utf8ToString(), equalTo("10")); + assertThat(result.getBuckets().get(1).getDocCount(), equalTo(6L)); + assertThat(result.getBuckets().get(2).key.utf8ToString(), equalTo("2")); + assertThat(result.getBuckets().get(2).getDocCount(), equalTo(3L)); + assertThat(result.getBuckets().get(3).key.utf8ToString(), equalTo("3")); + assertThat(result.getBuckets().get(3).getDocCount(), equalTo(7L)); + assertThat(result.getBuckets().get(4).key.utf8ToString(), equalTo("9")); + assertThat(result.getBuckets().get(4).getDocCount(), equalTo(9L)); + } }