diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeries.java b/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeries.java index 3b15f3fe3c2c1..e0fa5bb5350c4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeries.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeries.java @@ -8,8 +8,13 @@ package org.elasticsearch.search.aggregations.timeseries; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.BytesRefHash; +import org.elasticsearch.common.util.LongObjectPagedHashMap; +import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.search.aggregations.AggregationReduceContext; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; @@ -46,11 +51,12 @@ public class InternalTimeSeries extends InternalMultiBucketAggregation key; + protected final BytesRef key; + // TODO: make computing docCount optional protected long docCount; protected InternalAggregations aggregations; - public InternalBucket(Map key, long docCount, InternalAggregations aggregations, boolean keyed) { + public InternalBucket(BytesRef key, long docCount, InternalAggregations aggregations, boolean keyed) { this.key = key; this.docCount = docCount; this.aggregations = aggregations; @@ -62,26 +68,26 @@ public InternalBucket(Map key, long docCount, InternalAggregatio */ public InternalBucket(StreamInput in, boolean keyed) throws IOException { this.keyed = keyed; - key = in.readOrderedMap(StreamInput::readString, StreamInput::readGenericValue); + key = in.readBytesRef(); docCount = in.readVLong(); aggregations = InternalAggregations.readFrom(in); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeMap(key, StreamOutput::writeString, StreamOutput::writeGenericValue); + out.writeBytesRef(key); out.writeVLong(docCount); aggregations.writeTo(out); } @Override public Map getKey() { - return key; + return TimeSeriesIdFieldMapper.decodeTsid(key); } @Override public String getKeyAsString() { - return key.toString(); + return getKey().toString(); } @Override @@ -96,8 +102,10 @@ public InternalAggregations getAggregations() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + // Use map key in the xcontent response: + var key = getKey(); if (keyed) { - builder.startObject(getKeyAsString()); + builder.startObject(key.toString()); } else { builder.startObject(); } @@ -186,38 +194,46 @@ protected void doWriteTo(StreamOutput out) throws IOException { @Override public InternalAggregation reduce(List aggregations, AggregationReduceContext reduceContext) { - // We still need to reduce in case we got the same time series in 2 different indices, but we should be able to optimize - // that in the future - Map, List> bucketsList = null; - for (InternalAggregation aggregation : aggregations) { - InternalTimeSeries timeSeries = (InternalTimeSeries) aggregation; - if (bucketsList != null) { - for (InternalBucket bucket : timeSeries.buckets) { - bucketsList.compute(bucket.key, (map, list) -> { - if (list == null) { - list = new ArrayList<>(); + // TODO: optimize single result case either by having a if check here and return aggregations.get(0) or + // by overwriting the mustReduceOnSingleInternalAgg() method + + final BigArrays bigArrays = reduceContext.bigArrays(); + final int initialCapacity = aggregations.stream() + .map(value -> (InternalTimeSeries) value) + .mapToInt(value -> value.getBuckets().size()) + .max() + .getAsInt(); + try (LongObjectPagedHashMap> tsKeyToBuckets = new LongObjectPagedHashMap<>(initialCapacity, bigArrays)) { + final int numTsids; + // We still need to reduce in case we got the same time series in 2 different indices, but we should be able to optimize + // that in the future + try (BytesRefHash tsids = new BytesRefHash(initialCapacity, bigArrays)) { + for (int i = 0; i < aggregations.size(); i++) { + InternalTimeSeries timeSeries = (InternalTimeSeries) aggregations.get(i); + for (int j = 0; j < timeSeries.getBuckets().size(); j++) { + InternalBucket bucket = timeSeries.getBuckets().get(j); + long key = tsids.add(bucket.key); + List buckets; + if (key < 0) { + key = -1 - key; + buckets = tsKeyToBuckets.get(key); + } else { + buckets = new ArrayList<>(); } - list.add(bucket); - return list; - }); - } - } else { - bucketsList = new HashMap<>(timeSeries.buckets.size()); - for (InternalTimeSeries.InternalBucket bucket : timeSeries.buckets) { - List bucketList = new ArrayList<>(); - bucketList.add(bucket); - bucketsList.put(bucket.key, bucketList); + buckets.add(bucket); + tsKeyToBuckets.put(key, buckets); + } } + numTsids = (int) tsids.size(); } - } - reduceContext.consumeBucketsAndMaybeBreak(bucketsList.size()); - InternalTimeSeries reduced = new InternalTimeSeries(name, new ArrayList<>(bucketsList.size()), keyed, getMetadata()); - for (Map.Entry, List> bucketEntry : bucketsList.entrySet()) { - reduced.buckets.add(reduceBucket(bucketEntry.getValue(), reduceContext)); + reduceContext.consumeBucketsAndMaybeBreak(numTsids); + InternalTimeSeries reduced = new InternalTimeSeries(name, new ArrayList<>(numTsids), keyed, getMetadata()); + for (LongObjectPagedHashMap.Cursor> tsKeyToBucket : tsKeyToBuckets) { + reduced.buckets.add(reduceBucket(tsKeyToBucket.value, reduceContext)); + } + return reduced; } - return reduced; - } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregator.java index 8a077b100e1ea..f225225d25aec 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregator.java @@ -10,7 +10,6 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.core.Releasables; -import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; @@ -51,14 +50,14 @@ public TimeSeriesAggregator( public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { InternalTimeSeries.InternalBucket[][] allBucketsPerOrd = new InternalTimeSeries.InternalBucket[owningBucketOrds.length][]; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - BytesRef spareKey = new BytesRef(); BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); List buckets = new ArrayList<>(); while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); + BytesRef spareKey = new BytesRef(); ordsEnum.readValue(spareKey); InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket( - TimeSeriesIdFieldMapper.decodeTsid(spareKey), + spareKey, docCount, null, keyed diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeriesTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeriesTests.java index 5f374ca9c7425..200e18505d667 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeriesTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeriesTests.java @@ -8,9 +8,12 @@ package org.elasticsearch.search.aggregations.timeseries; +import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.test.InternalMultiBucketAggregationTestCase; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -28,7 +31,16 @@ private List randomBuckets(boolean keyed, Int List> keys = randomKeys(bucketKeys(randomIntBetween(1, 4)), numberOfBuckets); for (int j = 0; j < numberOfBuckets; j++) { long docCount = randomLongBetween(0, Long.MAX_VALUE / (20L * numberOfBuckets)); - bucketList.add(new InternalTimeSeries.InternalBucket(keys.get(j), docCount, aggregations, keyed)); + var builder = new TimeSeriesIdFieldMapper.TimeSeriesIdBuilder(null); + for (var entry : keys.get(j).entrySet()) { + builder.addString(entry.getKey(), (String) entry.getValue()); + } + try { + var key = builder.build().toBytesRef(); + bucketList.add(new InternalTimeSeries.InternalBucket(key, docCount, aggregations, keyed)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } return bucketList; }