diff --git a/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeriesTests.java b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeriesTests.java index f8b1dba1d86b3..1d9a48a1fd923 100644 --- a/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeriesTests.java +++ b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeriesTests.java @@ -53,7 +53,7 @@ private List randomBuckets(boolean keyed, InternalAggregations a builder.addKeywordDimension(entry.getKey(), (String) entry.getValue()); } try { - var key = builder.withoutHash().toBytesRef(); + var key = builder.withHash().toBytesRef(); bucketList.add(new InternalBucket(key, docCount, aggregations, keyed)); } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregatorTests.java b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregatorTests.java index f9cab9ff49cbf..4326806c90e5e 100644 --- a/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregatorTests.java +++ b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregatorTests.java @@ -19,12 +19,7 @@ import org.apache.lucene.search.Query; import org.apache.lucene.tests.index.RandomIndexWriter; import org.elasticsearch.aggregations.bucket.AggregationTestCase; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.CheckedConsumer; -import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.IndexVersion; -import org.elasticsearch.index.IndexVersions; import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.KeywordFieldMapper; @@ -38,7 +33,6 @@ import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram; import org.elasticsearch.search.aggregations.metrics.Sum; import org.elasticsearch.search.aggregations.support.ValuesSourceType; -import org.elasticsearch.test.index.IndexVersionUtils; import java.io.IOException; import java.util.ArrayList; @@ -72,12 +66,36 @@ public void testStandAloneTimeSeriesWithSum() throws IOException { }, ts -> { assertThat(ts.getBuckets(), hasSize(3)); - assertThat(ts.getBucketByKey("{dim1=aaa, dim2=xxx}").docCount, equalTo(2L)); - assertThat(((Sum) ts.getBucketByKey("{dim1=aaa, dim2=xxx}").getAggregations().get("sum")).value(), equalTo(6.0)); - assertThat(ts.getBucketByKey("{dim1=aaa, dim2=yyy}").docCount, equalTo(2L)); - assertThat(((Sum) ts.getBucketByKey("{dim1=aaa, dim2=yyy}").getAggregations().get("sum")).value(), equalTo(8.0)); - assertThat(ts.getBucketByKey("{dim1=bbb, dim2=zzz}").docCount, equalTo(4L)); - assertThat(((Sum) ts.getBucketByKey("{dim1=bbb, dim2=zzz}").getAggregations().get("sum")).value(), equalTo(22.0)); + assertThat( + ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAAAS_0VrZGalylFPi9dkK4dYyY9g0yybS6o").docCount, + equalTo(4L) + ); + assertThat( + ((Sum) ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAAAS_0VrZGalylFPi9dkK4dYyY9g0yybS6o") + .getAggregations() + .get("sum")).value(), + equalTo(22.0) + ); + assertThat( + ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAABjzNyfgM9Vvd-IRpsCvXNT5j_dX4tz0qg").docCount, + equalTo(2L) + ); + assertThat( + ((Sum) ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAABjzNyfgM9Vvd-IRpsCvXNT5j_dX4tz0qg") + .getAggregations() + .get("sum")).value(), + equalTo(6.0) + ); + assertThat( + ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAABjzNyftxCMQuGv-XOPz9J6bZM-ZhUGnV4").docCount, + equalTo(2L) + ); + assertThat( + ((Sum) ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAABjzNyftxCMQuGv-XOPz9J6bZM-ZhUGnV4") + .getAggregations() + .get("sum")).value(), + equalTo(8.0) + ); }, new KeywordFieldMapper.KeywordFieldType("dim1"), @@ -107,7 +125,7 @@ public static void writeTS(RandomIndexWriter iw, long timestamp, Object[] dimens fields.add(new DoubleDocValuesField(metrics[i].toString(), (double) metrics[i + 1])); } } - fields.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, builder.withoutHash().toBytesRef())); + fields.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, builder.withHash().toBytesRef())); // TODO: Handle metrics iw.addDocument(fields); } @@ -158,23 +176,38 @@ public void testMultiBucketAggregationAsSubAggregation() throws IOException { Consumer verifier = ts -> { assertThat(ts.getBuckets(), hasSize(3)); - assertThat(ts.getBucketByKey("{dim1=aaa, dim2=xxx}").docCount, equalTo(2L)); - InternalDateHistogram byTimeStampBucket = ts.getBucketByKey("{dim1=aaa, dim2=xxx}").getAggregations().get("by_timestamp"); + assertThat( + ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAAAS_0VrZGalylFPi9dkK4dYyY9g0yybS6o").docCount, + equalTo(4L) + ); + InternalDateHistogram byTimeStampBucket = ts.getBucketByKey( + "C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAAAS_0VrZGalylFPi9dkK4dYyY9g0yybS6o" + ).getAggregations().get("by_timestamp"); assertThat( byTimeStampBucket.getBuckets(), - contains(new InternalDateHistogram.Bucket(startTime, 2, false, null, InternalAggregations.EMPTY)) + contains(new InternalDateHistogram.Bucket(startTime, 4, false, null, InternalAggregations.EMPTY)) ); - assertThat(ts.getBucketByKey("{dim1=aaa, dim2=yyy}").docCount, equalTo(2L)); - byTimeStampBucket = ts.getBucketByKey("{dim1=aaa, dim2=yyy}").getAggregations().get("by_timestamp"); + assertThat( + ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAABjzNyfgM9Vvd-IRpsCvXNT5j_dX4tz0qg").docCount, + equalTo(2L) + ); + byTimeStampBucket = ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAABjzNyfgM9Vvd-IRpsCvXNT5j_dX4tz0qg") + .getAggregations() + .get("by_timestamp"); assertThat( byTimeStampBucket.getBuckets(), contains(new InternalDateHistogram.Bucket(startTime, 2, false, null, InternalAggregations.EMPTY)) ); - assertThat(ts.getBucketByKey("{dim1=bbb, dim2=zzz}").docCount, equalTo(4L)); - byTimeStampBucket = ts.getBucketByKey("{dim1=bbb, dim2=zzz}").getAggregations().get("by_timestamp"); + assertThat( + ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAABjzNyftxCMQuGv-XOPz9J6bZM-ZhUGnV4").docCount, + equalTo(2L) + ); + byTimeStampBucket = ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAABjzNyftxCMQuGv-XOPz9J6bZM-ZhUGnV4") + .getAggregations() + .get("by_timestamp"); assertThat( byTimeStampBucket.getBuckets(), - contains(new InternalDateHistogram.Bucket(startTime, 4, false, null, InternalAggregations.EMPTY)) + contains(new InternalDateHistogram.Bucket(startTime, 2, false, null, InternalAggregations.EMPTY)) ); }; @@ -191,9 +224,24 @@ public void testAggregationSize() throws IOException { List> verifiers = new ArrayList>(); - verifiers.add(ts -> assertThat(ts.getBucketByKey("{dim1=aaa, dim2=xxx}").docCount, equalTo(2L))); - verifiers.add(ts -> assertThat(ts.getBucketByKey("{dim1=aaa, dim2=yyy}").docCount, equalTo(2L))); - verifiers.add(ts -> assertThat(ts.getBucketByKey("{dim1=bbb, dim2=zzz}").docCount, equalTo(4L))); + verifiers.add( + ts -> assertThat( + ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAAAS_0VrZGalylFPi9dkK4dYyY9g0yybS6o").docCount, + equalTo(4L) + ) + ); + verifiers.add( + ts -> assertThat( + ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAABjzNyfgM9Vvd-IRpsCvXNT5j_dX4tz0qg").docCount, + equalTo(2L) + ) + ); + verifiers.add( + ts -> assertThat( + ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAABjzNyftxCMQuGv-XOPz9J6bZM-ZhUGnV4").docCount, + equalTo(2L) + ) + ); for (int i = 1; i <= 3; i++) { int size = i; diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/delete/70_tsdb.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/delete/70_tsdb.yml index 130f3690bb298..6f761f2d4a5e1 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/delete/70_tsdb.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/delete/70_tsdb.yml @@ -49,15 +49,15 @@ setup: location: swamp temperature: 32.4 humidity: 88.9 - - match: { _id: crxuhC8WO3aVdhvtAAABiHD35_g } + - match: { _id: crxuhDzuYSDy965GAAABiHD35_g } - match: { result: created } - match: { _version: 1 } - do: delete: index: weather_sensors - id: crxuhC8WO3aVdhvtAAABiHD35_g - - match: { _id: crxuhC8WO3aVdhvtAAABiHD35_g } + id: crxuhDzuYSDy965GAAABiHD35_g + - match: { _id: crxuhDzuYSDy965GAAABiHD35_g } - match: { result: deleted } - match: { _version: 2 } @@ -74,6 +74,6 @@ setup: location: swamp temperature: 32.4 humidity: 88.9 - - match: { _id: crxuhC8WO3aVdhvtAAABiHD35_g } + - match: { _id: crxuhDzuYSDy965GAAABiHD35_g } - match: { result: created } - match: { _version: 3 } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesIdFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesIdFieldMapper.java index b7ee67b907496..62a5d937ffd05 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesIdFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesIdFieldMapper.java @@ -13,7 +13,6 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.StringHelper; import org.elasticsearch.cluster.routing.IndexRouting; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.hash.Murmur3Hasher; import org.elasticsearch.common.hash.MurmurHash3; @@ -37,13 +36,10 @@ import java.io.IOException; import java.net.InetAddress; -import java.nio.charset.StandardCharsets; import java.time.ZoneId; import java.util.Base64; import java.util.Collections; import java.util.Comparator; -import java.util.LinkedHashMap; -import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -58,9 +54,7 @@ public class TimeSeriesIdFieldMapper extends MetadataFieldMapper { public static final String CONTENT_TYPE = "_tsid"; public static final TimeSeriesIdFieldType FIELD_TYPE = new TimeSeriesIdFieldType(); public static final TimeSeriesIdFieldMapper INSTANCE = new TimeSeriesIdFieldMapper(); - - // NOTE: used by {@link TimeSeriesIdFieldMapper#decodeTsid(StreamInput)} )}. Remove both if using _tsid hashing - public static final int TSID_HASH_SENTINEL = 0xBAADCAFE; + private static final Base64.Encoder BASE64_ENCODER = Base64.getUrlEncoder().withoutPadding(); @Override public FieldMapper.Builder getMergeBuilder() { @@ -164,31 +158,9 @@ public SourceLoader.SyntheticFieldLoader syntheticFieldLoader() { */ public static Object decodeTsid(StreamInput in) { try { - int sizeOrTsidHashSentinel = in.readVInt(); - if (sizeOrTsidHashSentinel == TSID_HASH_SENTINEL) { - return Base64.getUrlEncoder().withoutPadding().encodeToString(in.readBytesRef().bytes); - } - Map result = new LinkedHashMap<>(sizeOrTsidHashSentinel); - - for (int i = 0; i < sizeOrTsidHashSentinel; i++) { - String name = in.readBytesRef().utf8ToString(); - - int type = in.read(); - switch (type) { - case (byte) 's' -> // parse a string - result.put(name, in.readBytesRef().utf8ToString()); - case (byte) 'l' -> // parse a long - result.put(name, in.readLong()); - case (byte) 'u' -> { // parse an unsigned_long - Object ul = DocValueFormat.UNSIGNED_LONG_SHIFTED.format(in.readLong()); - result.put(name, ul); - } - default -> throw new IllegalArgumentException("Cannot parse [" + name + "]: Unknown type [" + type + "]"); - } - } - return result; - } catch (IOException | IllegalArgumentException e) { - throw new IllegalArgumentException("Error formatting " + NAME + ": " + e.getMessage(), e); + return base64Encode(in.readBytesRef()); + } catch (IOException e) { + throw new IllegalArgumentException("Unable to read tsid"); } } @@ -250,7 +222,7 @@ public BytesReference withHash() throws IOException { // NOTE: hash all dimension field names int numberOfDimensions = Math.min(MAX_DIMENSIONS, dimensions.size()); int tsidHashIndex = 0; - byte[] tsidHash = new byte[16 + 16 + 16 + 4 * numberOfDimensions]; + byte[] tsidHash = new byte[16 + 16 + 4 * numberOfDimensions]; tsidHasher.reset(); for (final DimensionDataHolder dimension : dimensions) { @@ -258,13 +230,6 @@ public BytesReference withHash() throws IOException { } tsidHashIndex = writeHash128(tsidHasher.digestHash(), tsidHash, tsidHashIndex); - // NOTE: hash all metric field names - tsidHasher.reset(); - for (final String metric : metrics) { - tsidHasher.update(metric.getBytes(StandardCharsets.UTF_8)); - } - tsidHashIndex = writeHash128(tsidHasher.digestHash(), tsidHash, tsidHashIndex); - // NOTE: concatenate all dimension value hashes up to a certain number of dimensions int tsidHashStartIndex = tsidHashIndex; for (final DimensionDataHolder dimension : dimensions) { @@ -292,7 +257,6 @@ public BytesReference withHash() throws IOException { assert tsidHashIndex == tsidHash.length; try (BytesStreamOutput out = new BytesStreamOutput()) { - out.writeVInt(TSID_HASH_SENTINEL); out.writeBytesRef(new BytesRef(tsidHash, 0, tsidHash.length)); return out.bytes(); } @@ -374,10 +338,10 @@ private void add(String fieldName, BytesReference encoded) throws IOException { } public static Object decodeTsid(BytesRef bytesRef) { - try (StreamInput input = new BytesArray(bytesRef).streamInput()) { - return decodeTsid(input); - } catch (IOException ex) { - throw new IllegalArgumentException("Dimension field cannot be deserialized.", ex); - } + return base64Encode(bytesRef); + } + + private static String base64Encode(BytesRef bytesRef) { + return BASE64_ENCODER.encodeToString(bytesRef.bytes); } } diff --git a/server/src/main/java/org/elasticsearch/search/DocValueFormat.java b/server/src/main/java/org/elasticsearch/search/DocValueFormat.java index 6e17966a1c4da..1ff067f24f29b 100644 --- a/server/src/main/java/org/elasticsearch/search/DocValueFormat.java +++ b/server/src/main/java/org/elasticsearch/search/DocValueFormat.java @@ -11,7 +11,6 @@ import org.apache.lucene.document.InetAddressPoint; import org.apache.lucene.util.BytesRef; import org.elasticsearch.TransportVersions; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -21,7 +20,6 @@ import org.elasticsearch.common.time.DateMathParser; import org.elasticsearch.geometry.utils.Geohash; import org.elasticsearch.index.mapper.DateFieldMapper; -import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper.TimeSeriesIdBuilder; import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils; @@ -694,11 +692,25 @@ public String toString() { @Override public Object format(BytesRef value) { - return TimeSeriesIdFieldMapper.decodeTsid(new BytesArray(value).streamInput()); + return Base64.getUrlEncoder().withoutPadding().encodeToString(value.bytes); } @Override public BytesRef parseBytesRef(Object value) { + if (value instanceof BytesRef) { + return (BytesRef) value; + } + return plainTsidParseBytesRef(value); + } + + /** + * After introducing tsid hashing this tsid parsing logic is deprecated. + * Tsid hashing does not allow us to parse the tsid extracting dimension fields key/values pairs. + * @param value The Map encoding tsid dimension fields key/value pairs. + * + * @return + */ + private BytesRef plainTsidParseBytesRef(Object value) { if (value instanceof Map == false) { throw new IllegalArgumentException("Cannot parse tsid object [" + value + "]"); } @@ -724,7 +736,7 @@ public BytesRef parseBytesRef(Object value) { } try { - return builder.withHash().toBytesRef(); + return builder.withoutHash().toBytesRef(); } catch (IOException e) { throw new IllegalArgumentException(e); } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/IdLoaderTests.java b/server/src/test/java/org/elasticsearch/index/mapper/IdLoaderTests.java index 8f6b5fb452de7..c94e0dd14ace0 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/IdLoaderTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/IdLoaderTests.java @@ -53,8 +53,8 @@ public void testSynthesizeIdSimple() throws Exception { long startTime = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2023-01-01T00:00:00Z"); List docs = List.of( - new Doc(startTime, List.of(new Dimension("dim1", "aaa"), new Dimension("dim2", "xxx"))), - new Doc(startTime + 1, List.of(new Dimension("dim1", "aaa"), new Dimension("dim2", "yyy"))), + new Doc(startTime, List.of(new Dimension("dim1", "aaa"), new Dimension("dim2", "yyy"))), + new Doc(startTime + 1, List.of(new Dimension("dim1", "aaa"), new Dimension("dim2", "xxx"))), new Doc(startTime + 2, List.of(new Dimension("dim1", "bbb"), new Dimension("dim2", "xxx"))) ); CheckedConsumer verify = indexReader -> { @@ -62,9 +62,10 @@ public void testSynthesizeIdSimple() throws Exception { LeafReader leafReader = indexReader.leaves().get(0).reader(); assertThat(leafReader.numDocs(), equalTo(3)); var leaf = idLoader.leaf(null, leafReader, new int[] { 0, 1, 2 }); - assertThat(leaf.getId(0), equalTo(expectedId(routing, docs.get(0)))); + // NOTE: time series data is ordered by (tsid, timestamp) + assertThat(leaf.getId(0), equalTo(expectedId(routing, docs.get(2)))); assertThat(leaf.getId(1), equalTo(expectedId(routing, docs.get(1)))); - assertThat(leaf.getId(2), equalTo(expectedId(routing, docs.get(2)))); + assertThat(leaf.getId(2), equalTo(expectedId(routing, docs.get(0)))); }; prepareIndexReader(indexAndForceMerge(routing, docs), verify, false); } @@ -234,7 +235,7 @@ private static void indexDoc(IndexRouting.ExtractFromSource routing, IndexWriter fields.add(new SortedSetDocValuesField(dimension.field, new BytesRef(dimension.value.toString()))); } } - BytesRef tsid = builder.withoutHash().toBytesRef(); + BytesRef tsid = builder.withHash().toBytesRef(); fields.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, tsid)); iw.addDocument(fields); } @@ -252,7 +253,7 @@ private static String expectedId(IndexRouting.ExtractFromSource routing, Doc doc return TsidExtractingIdFieldMapper.createId( false, routingBuilder, - timeSeriesIdBuilder.withoutHash().toBytesRef(), + timeSeriesIdBuilder.withHash().toBytesRef(), doc.timestamp, new byte[16] ); diff --git a/server/src/test/java/org/elasticsearch/index/mapper/TimeSeriesIdFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/TimeSeriesIdFieldMapperTests.java index c4227823d2391..6ec638a6039b1 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/TimeSeriesIdFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/TimeSeriesIdFieldMapperTests.java @@ -21,11 +21,7 @@ import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; -import java.util.List; -import java.util.Map; -import static org.elasticsearch.test.MapMatcher.assertMap; -import static org.elasticsearch.test.MapMatcher.matchesMap; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -97,10 +93,7 @@ public void testEnabledInTimeSeriesMode() throws Exception { ); assertThat(doc.rootDoc().getField("a").binaryValue(), equalTo(new BytesRef("value"))); assertThat(doc.rootDoc().getField("b").numericValue(), equalTo(100L)); - assertMap( - (Map) TimeSeriesIdFieldMapper.decodeTsid(new ByteArrayStreamInput(doc.rootDoc().getBinaryValue("_tsid").bytes)), - matchesMap().entry("a", "value").entry("b", 100L) - ); + assertEquals(TimeSeriesIdFieldMapper.decodeTsid(new ByteArrayStreamInput(doc.rootDoc().getBinaryValue("_tsid").bytes)), "AWE"); } public void testDisabledInStandardMode() throws Exception { @@ -145,10 +138,7 @@ public void testStrings() throws IOException { docMapper, b -> b.field("a", "foo").field("b", "bar").field("c", "baz").startObject("o").field("e", "bort").endObject() ); - assertMap( - (Map) TimeSeriesIdFieldMapper.decodeTsid(new BytesArray(tsid).streamInput()), - matchesMap().entry("a", "foo").entry("o.e", "bort") - ); + assertEquals(TimeSeriesIdFieldMapper.decodeTsid(new BytesArray(tsid).streamInput()), "AWE"); } @SuppressWarnings("unchecked") @@ -161,12 +151,8 @@ public void testUnicodeKeys() throws IOException { })); ParsedDocument doc = parseDocument(docMapper, b -> b.field(fire, "hot").field(coffee, "good")); - Map tsid = (Map) TimeSeriesIdFieldMapper.decodeTsid( - new ByteArrayStreamInput(doc.rootDoc().getBinaryValue("_tsid").bytes) - ); - assertMap(tsid, matchesMap().entry(coffee, "good").entry(fire, "hot")); - // Also make sure the keys are in order - assertThat(List.copyOf(tsid.keySet()), equalTo(List.of(coffee, fire))); + Object tsid = TimeSeriesIdFieldMapper.decodeTsid(new ByteArrayStreamInput(doc.rootDoc().getBinaryValue("_tsid").bytes)); + assertEquals(tsid, "A-I"); } @SuppressWarnings("unchecked") @@ -176,10 +162,7 @@ public void testKeywordTooLong() throws IOException { })); ParsedDocument doc = parseDocument(docMapper, b -> b.field("a", "more_than_1024_bytes".repeat(52))); - assertMap( - (Map) TimeSeriesIdFieldMapper.decodeTsid(new ByteArrayStreamInput(doc.rootDoc().getBinaryValue("_tsid").bytes)), - matchesMap().entry("a", "more_than_1024_bytes".repeat(52)) - ); + assertEquals(TimeSeriesIdFieldMapper.decodeTsid(new ByteArrayStreamInput(doc.rootDoc().getBinaryValue("_tsid").bytes)), "AQ"); } @SuppressWarnings("unchecked") @@ -190,10 +173,7 @@ public void testKeywordTooLongUtf8() throws IOException { String theWordLong = "長い"; ParsedDocument doc = parseDocument(docMapper, b -> b.field("a", theWordLong.repeat(200))); - assertMap( - (Map) TimeSeriesIdFieldMapper.decodeTsid(new ByteArrayStreamInput(doc.rootDoc().getBinaryValue("_tsid").bytes)), - matchesMap().entry("a", theWordLong.repeat(200)) - ); + assertEquals(TimeSeriesIdFieldMapper.decodeTsid(new ByteArrayStreamInput(doc.rootDoc().getBinaryValue("_tsid").bytes)), "AQ"); } public void testKeywordNull() throws IOException { @@ -232,10 +212,7 @@ public void testLong() throws IOException { b.field("c", "baz"); b.startObject("o").field("e", 1234).endObject(); }); - assertMap( - (Map) TimeSeriesIdFieldMapper.decodeTsid(new BytesArray(tsid).streamInput()), - matchesMap().entry("kw", "kw").entry("a", 1L).entry("o.e", 1234L) - ); + assertEquals(TimeSeriesIdFieldMapper.decodeTsid(new BytesArray(tsid).streamInput()), "AWFs"); } public void testLongInvalidString() throws IOException { @@ -287,10 +264,7 @@ public void testInteger() throws IOException { b.field("c", "baz"); b.startObject("o").field("e", Integer.MIN_VALUE).endObject(); }); - assertMap( - (Map) TimeSeriesIdFieldMapper.decodeTsid(new BytesArray(tsid).streamInput()), - matchesMap().entry("kw", "kw").entry("a", 1L).entry("o.e", (long) Integer.MIN_VALUE) - ); + assertEquals(TimeSeriesIdFieldMapper.decodeTsid(new BytesArray(tsid).streamInput()), "AWFs"); } public void testIntegerInvalidString() throws IOException { @@ -346,10 +320,7 @@ public void testShort() throws IOException { b.field("c", "baz"); b.startObject("o").field("e", Short.MIN_VALUE).endObject(); }); - assertMap( - (Map) TimeSeriesIdFieldMapper.decodeTsid(new BytesArray(tsid).streamInput()), - matchesMap().entry("kw", "kw").entry("a", 1L).entry("o.e", (long) Short.MIN_VALUE) - ); + assertEquals(TimeSeriesIdFieldMapper.decodeTsid(new BytesArray(tsid).streamInput()), "AWFs"); } public void testShortInvalidString() throws IOException { @@ -405,10 +376,7 @@ public void testByte() throws IOException { b.field("c", "baz"); b.startObject("o").field("e", (int) Byte.MIN_VALUE).endObject(); }); - assertMap( - (Map) TimeSeriesIdFieldMapper.decodeTsid(new BytesArray(tsid).streamInput()), - matchesMap().entry("kw", "kw").entry("a", 1L).entry("o.e", (long) Byte.MIN_VALUE) - ); + assertEquals(TimeSeriesIdFieldMapper.decodeTsid(new BytesArray(tsid).streamInput()), "AWFs"); } public void testByteInvalidString() throws IOException { @@ -464,10 +432,7 @@ public void testIp() throws IOException { b.field("c", "baz"); b.startObject("o").field("e", "255.255.255.1").endObject(); }); - assertMap( - (Map) TimeSeriesIdFieldMapper.decodeTsid(new ByteArrayStreamInput(doc.rootDoc().getBinaryValue("_tsid").bytes)), - matchesMap().entry("kw", "kw").entry("a", "192.168.0.1").entry("o.e", "255.255.255.1") - ); + assertEquals(TimeSeriesIdFieldMapper.decodeTsid(new ByteArrayStreamInput(doc.rootDoc().getBinaryValue("_tsid").bytes)), "AWFz"); } public void testIpInvalidString() throws IOException { @@ -502,16 +467,11 @@ public void testVeryLarge() throws IOException { } }); - Map tsidAsMap = (Map) TimeSeriesIdFieldMapper.decodeTsid( - new ByteArrayStreamInput(doc.rootDoc().getBinaryValue("_tsid").bytes) + Object tsid = TimeSeriesIdFieldMapper.decodeTsid(new ByteArrayStreamInput(doc.rootDoc().getBinaryValue("_tsid").bytes)); + assertEquals( + tsid, + "AWJzA2ZvbwJkMHPwBm1hbnkgd29yZHMgbWFueSB3b3JkcyBtYW55IHdvcmRzIG1hbnkgd29yZHMgbWFueSB3b3JkcyBtYW55IHdvcmRzIG1hbnkgd29yZHMgbWFueSB3b3JkcyA" ); - for (final Map.Entry entry : tsidAsMap.entrySet()) { - if ("b".equals(entry.getKey())) { - assertEquals("foo", entry.getValue()); - } else { - assertEquals(entry.getValue(), large); - } - } } /** diff --git a/server/src/test/java/org/elasticsearch/search/DocValueFormatTests.java b/server/src/test/java/org/elasticsearch/search/DocValueFormatTests.java index eac9cee3aee82..c50a42099b037 100644 --- a/server/src/test/java/org/elasticsearch/search/DocValueFormatTests.java +++ b/server/src/test/java/org/elasticsearch/search/DocValueFormatTests.java @@ -377,9 +377,9 @@ public void testParseTsid() throws IOException { timeSeriesIdBuilder.addKeywordDimension("string", randomAlphaOfLength(10)); timeSeriesIdBuilder.addLongDimension("long", randomLong()); timeSeriesIdBuilder.addUnsignedLongDimension("ulong", randomLong()); - BytesRef tsidBytes = timeSeriesIdBuilder.withoutHash().toBytesRef(); - Object tsidFormat = DocValueFormat.TIME_SERIES_ID.format(tsidBytes); - BytesRef tsidParse = DocValueFormat.TIME_SERIES_ID.parseBytesRef(tsidFormat); - assertEquals(tsidBytes, tsidParse); + BytesRef expected = timeSeriesIdBuilder.withHash().toBytesRef(); + Object tsidFormat = DocValueFormat.TIME_SERIES_ID.format(expected); + BytesRef actual = DocValueFormat.TIME_SERIES_ID.parseBytesRef(tsidFormat); + assertEquals(expected, actual); } } diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/TimeSeriesRateAggregatorTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/TimeSeriesRateAggregatorTests.java index 46099ebba74eb..333fc888a8f0c 100644 --- a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/TimeSeriesRateAggregatorTests.java +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/TimeSeriesRateAggregatorTests.java @@ -82,8 +82,13 @@ public void testNestedWithinDateHistogram() throws IOException { Consumer verifier = r -> { assertThat(r.getBuckets(), hasSize(2)); - assertThat(r.getBucketByKey("{dim=1}"), instanceOf(InternalTimeSeries.InternalBucket.class)); - InternalDateHistogram hb = r.getBucketByKey("{dim=1}").getAggregations().get("date"); + assertThat( + r.getBucketByKey("NFFUy14C9UcX3MnFnsFrpf0AAAAAAAAAAAAAAAAAAAAAiw707uTAKMYIIZrxFHeWv4t2N0w"), + instanceOf(InternalTimeSeries.InternalBucket.class) + ); + InternalDateHistogram hb = r.getBucketByKey("NFFUy14C9UcX3MnFnsFrpf0AAAAAAAAAAAAAAAAAAAAAiw707uTAKMYIIZrxFHeWv4t2N0w") + .getAggregations() + .get("date"); { Rate rate = hb.getBuckets().get(1).getAggregations().get("counter_field"); assertThat(rate.getValue(), closeTo((60 - 37 + 14) / 2000.0 * MILLIS_IN_SECOND, 0.00001)); @@ -92,7 +97,7 @@ public void testNestedWithinDateHistogram() throws IOException { Rate rate = hb.getBuckets().get(0).getAggregations().get("counter_field"); assertThat(rate.getValue(), closeTo((37 - 15) / 1000.0 * MILLIS_IN_SECOND, 0.00001)); } - hb = r.getBucketByKey("{dim=2}").getAggregations().get("date"); + hb = r.getBucketByKey("NFFUy14C9UcX3MnFnsFrpf0AAAAAAAAAAAAAAAAAAAAAoUYmO5acXOT4AOJNerMhm2RoK9I").getAggregations().get("date"); { Rate rate = hb.getBuckets().get(0).getAggregations().get("counter_field"); assertThat(rate.getValue(), closeTo((150 - 74) / 1000.0 * MILLIS_IN_SECOND, 0.00001)); @@ -162,7 +167,7 @@ private List docs(long startTimestamp, String dim, long... values) thr private static BytesReference tsid(String dim) throws IOException { TimeSeriesIdFieldMapper.TimeSeriesIdBuilder idBuilder = new TimeSeriesIdFieldMapper.TimeSeriesIdBuilder(null); idBuilder.addKeywordDimension("dim", dim); - return idBuilder.withoutHash(); + return idBuilder.withHash(); } private Document doc(long timestamp, BytesReference tsid, long counterValue) { diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldProducer.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldProducer.java new file mode 100644 index 0000000000000..54e3bed8d4b32 --- /dev/null +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldProducer.java @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.downsample; + +import org.elasticsearch.index.fielddata.FormattedDocValues; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +public class DimensionFieldProducer extends AbstractDownsampleFieldProducer { + private final Dimension dimension; + + DimensionFieldProducer(final String name, final Dimension dimension) { + super(name); + this.dimension = dimension; + } + + static class Dimension { + private final String name; + private Object value; + private boolean isEmpty; + + Dimension(String name) { + this.name = name; + this.isEmpty = true; + } + + public Object value() { + return value; + } + + public String name() { + return name; + } + + void reset() { + value = null; + isEmpty = true; + } + + void collect(final Object value) { + Objects.requireNonNull(value); + if (isEmpty) { + this.value = value; + this.isEmpty = false; + return; + } + if (value.equals(this.value) == false) { + throw new IllegalArgumentException("Dimension value changed without tsid change [" + value + "] != [" + this.value + "]"); + } + } + } + + @Override + public void reset() { + this.dimension.reset(); + } + + @Override + public boolean isEmpty() { + return this.dimension.isEmpty; + } + + @Override + public void collect(FormattedDocValues docValues, int docId) throws IOException { + if (docValues.advanceExact(docId) == false) { + throw new IllegalArgumentException("Unable to collect dimension [" + this.dimension.name + "]"); + } + int docValueCount = docValues.docValueCount(); + for (int i = 0; i < docValueCount; i++) { + this.dimension.collect(docValues.nextValue()); + } + } + + @Override + public void write(XContentBuilder builder) throws IOException { + if (isEmpty() == false) { + builder.field(this.dimension.name, this.dimension.value()); + } + } +} diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldValueFetcher.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldValueFetcher.java new file mode 100644 index 0000000000000..c6ef43cfdacfa --- /dev/null +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldValueFetcher.java @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.downsample; + +import org.elasticsearch.index.fielddata.IndexFieldData; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.query.SearchExecutionContext; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class DimensionFieldValueFetcher extends FieldValueFetcher { + + private final DimensionFieldProducer dimensionFieldProducer = createFieldProducer(); + + protected DimensionFieldValueFetcher(final MappedFieldType fieldType, final IndexFieldData fieldData) { + super(fieldType.name(), fieldType, fieldData); + } + + private DimensionFieldProducer createFieldProducer() { + final String filedName = fieldType.name(); + return new DimensionFieldProducer(filedName, new DimensionFieldProducer.Dimension(filedName)); + } + + @Override + public AbstractDownsampleFieldProducer fieldProducer() { + return this.dimensionFieldProducer; + } + + /** + * Retrieve field value fetchers for a list of dimensions. + */ + static List create(final SearchExecutionContext context, final String[] dimensions) { + List fetchers = new ArrayList<>(); + for (String dimension : dimensions) { + MappedFieldType fieldType = context.getFieldType(dimension); + assert fieldType != null : "Unknown dimension field type for dimension field: [" + dimension + "]"; + + if (context.fieldExistsInIndex(dimension)) { + final IndexFieldData fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH); + final String fieldName = context.isMultiField(dimension) + ? fieldType.name().substring(0, fieldType.name().lastIndexOf('.')) + : fieldType.name(); + fetchers.add(new DimensionFieldValueFetcher(fieldType, fieldData)); + } + } + return Collections.unmodifiableList(fetchers); + } +} diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java index f74bd299916c1..64da8c8a68080 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java @@ -94,6 +94,7 @@ class DownsampleShardIndexer { private final List fieldValueFetchers; private final DownsampleShardTask task; private final DownsampleShardPersistentTaskState state; + private final String[] dimensions; private volatile boolean abort = false; ByteSizeValue downsampleBulkSize = DOWNSAMPLE_BULK_SIZE; ByteSizeValue downsampleMaxBytesInFlight = DOWNSAMPLE_MAX_BYTES_IN_FLIGHT; @@ -107,6 +108,7 @@ class DownsampleShardIndexer { final DownsampleConfig config, final String[] metrics, final String[] labels, + final String[] dimensions, final DownsampleShardPersistentTaskState state ) { this.task = task; @@ -125,13 +127,15 @@ class DownsampleShardIndexer { null, Collections.emptyMap() ); + this.dimensions = dimensions; this.timestampField = (DateFieldMapper.DateFieldType) searchExecutionContext.getFieldType(config.getTimestampField()); this.timestampFormat = timestampField.docValueFormat(null, null); this.rounding = config.createRounding(); - List fetchers = new ArrayList<>(metrics.length + labels.length); + List fetchers = new ArrayList<>(metrics.length + labels.length + dimensions.length); fetchers.addAll(FieldValueFetcher.create(searchExecutionContext, metrics)); fetchers.addAll(FieldValueFetcher.create(searchExecutionContext, labels)); + fetchers.addAll(DimensionFieldValueFetcher.create(searchExecutionContext, dimensions)); this.fieldValueFetchers = Collections.unmodifiableList(fetchers); toClose = null; } finally { @@ -155,7 +159,7 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept BulkProcessor2 bulkProcessor = createBulkProcessor(); try (searcher; bulkProcessor) { final TimeSeriesIndexSearcher timeSeriesSearcher = new TimeSeriesIndexSearcher(searcher, List.of(this::checkCancelled)); - TimeSeriesBucketCollector bucketCollector = new TimeSeriesBucketCollector(bulkProcessor); + TimeSeriesBucketCollector bucketCollector = new TimeSeriesBucketCollector(bulkProcessor, this.dimensions); bucketCollector.preCollection(); timeSeriesSearcher.search(initialStateQuery, bucketCollector); } @@ -332,12 +336,12 @@ private class TimeSeriesBucketCollector extends BucketCollector { long lastTimestamp = Long.MAX_VALUE; long lastHistoTimestamp = Long.MAX_VALUE; - TimeSeriesBucketCollector(BulkProcessor2 bulkProcessor) { + TimeSeriesBucketCollector(BulkProcessor2 bulkProcessor, String[] dimensions) { this.bulkProcessor = bulkProcessor; AbstractDownsampleFieldProducer[] fieldProducers = fieldValueFetchers.stream() .map(FieldValueFetcher::fieldProducer) .toArray(AbstractDownsampleFieldProducer[]::new); - this.downsampleBucketBuilder = new DownsampleBucketBuilder(fieldProducers); + this.downsampleBucketBuilder = new DownsampleBucketBuilder(fieldProducers, dimensions); } @Override @@ -482,9 +486,11 @@ private class DownsampleBucketBuilder { private int docCount; private final AbstractDownsampleFieldProducer[] fieldProducers; private final DownsampleFieldSerializer[] groupedProducers; + private final String[] dimensions; - DownsampleBucketBuilder(AbstractDownsampleFieldProducer[] fieldProducers) { + DownsampleBucketBuilder(AbstractDownsampleFieldProducer[] fieldProducers, String[] dimensions) { this.fieldProducers = fieldProducers; + this.dimensions = dimensions; /* * The downsample field producers for aggregate_metric_double all share the same name (this is * the name they will be serialized in the target index). We group all field producers by @@ -545,12 +551,13 @@ public XContentBuilder buildDownsampleDocument() throws IOException { } builder.field(timestampField.name(), timestampFormat.format(timestamp)); builder.field(DocCountFieldMapper.NAME, docCount); + // TODO: should we make sure extracting dimension fields is backward compatible with older indices versions? // Extract dimension values from _tsid field, so we avoid loading them from doc_values - Map dimensions = (Map) DocValueFormat.TIME_SERIES_ID.format(tsid); - for (Map.Entry e : dimensions.entrySet()) { - assert e.getValue() != null; - builder.field((String) e.getKey(), e.getValue()); - } + // Map dimensions = (Map) DocValueFormat.TIME_SERIES_ID.format(tsid); + // for (Map.Entry e : dimensions.entrySet()) { + // assert e.getValue() != null; + // builder.field((String) e.getKey(), e.getValue()); + // } // Serialize fields for (DownsampleFieldSerializer fieldProducer : groupedProducers) { diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java index 06e69ab4702c1..ab613c2df5529 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java @@ -200,6 +200,7 @@ protected void doRun() throws Exception { params.downsampleConfig(), params.metrics(), params.labels(), + params.dimensions(), initialState ); downsampleShardIndexer.execute(); diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardTaskParams.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardTaskParams.java index 6a4ee88a0cdef..6288fc0d6f9c7 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardTaskParams.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardTaskParams.java @@ -32,7 +32,8 @@ public record DownsampleShardTaskParams( long indexEndTimeMillis, ShardId shardId, String[] metrics, - String[] labels + String[] labels, + String[] dimensions ) implements PersistentTaskParams { public static final String NAME = DownsampleShardTask.TASK_NAME; @@ -43,6 +44,7 @@ public record DownsampleShardTaskParams( private static final ParseField SHARD_ID = new ParseField("shard_id"); private static final ParseField METRICS = new ParseField("metrics"); private static final ParseField LABELS = new ParseField("labels"); + private static final ParseField DIMENSIONS = new ParseField("dimensions"); public static final ObjectParser PARSER = new ObjectParser<>(NAME); static { @@ -57,6 +59,7 @@ public record DownsampleShardTaskParams( PARSER.declareString(DownsampleShardTaskParams.Builder::shardId, SHARD_ID); PARSER.declareStringArray(DownsampleShardTaskParams.Builder::metrics, METRICS); PARSER.declareStringArray(DownsampleShardTaskParams.Builder::labels, LABELS); + PARSER.declareStringArray(DownsampleShardTaskParams.Builder::dimensions, DIMENSIONS); } DownsampleShardTaskParams(final StreamInput in) throws IOException { @@ -67,7 +70,8 @@ public record DownsampleShardTaskParams( in.readVLong(), new ShardId(in), in.readStringArray(), - in.readStringArray() + in.readStringArray(), + in.readOptionalStringArray() ); } @@ -81,6 +85,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(SHARD_ID.getPreferredName(), shardId); builder.array(METRICS.getPreferredName(), metrics); builder.array(LABELS.getPreferredName(), labels); + builder.array(DIMENSIONS.getPreferredName(), dimensions); return builder.endObject(); } @@ -103,6 +108,7 @@ public void writeTo(StreamOutput out) throws IOException { shardId.writeTo(out); out.writeStringArray(metrics); out.writeStringArray(labels); + out.writeOptionalStringArray(dimensions); } public static DownsampleShardTaskParams fromXContent(XContentParser parser) throws IOException { @@ -123,7 +129,8 @@ public boolean equals(Object o) { && Objects.equals(shardId.id(), that.shardId.id()) && Objects.equals(shardId.getIndexName(), that.shardId.getIndexName()) && Arrays.equals(metrics, that.metrics) - && Arrays.equals(labels, that.labels); + && Arrays.equals(labels, that.labels) + && Arrays.equals(dimensions, that.dimensions); } @Override @@ -138,6 +145,7 @@ public int hashCode() { ); result = 31 * result + Arrays.hashCode(metrics); result = 31 * result + Arrays.hashCode(labels); + result = 31 * result + Arrays.hashCode(dimensions); return result; } @@ -149,6 +157,7 @@ public static class Builder { ShardId shardId; String[] metrics; String[] labels; + String[] dimensions; public Builder downsampleConfig(final DownsampleConfig downsampleConfig) { this.downsampleConfig = downsampleConfig; @@ -185,6 +194,11 @@ public Builder labels(final List labels) { return this; } + public Builder dimensions(final List dimensions) { + this.dimensions = dimensions.toArray(String[]::new); + return this; + } + public DownsampleShardTaskParams build() { return new DownsampleShardTaskParams( downsampleConfig, @@ -193,7 +207,8 @@ public DownsampleShardTaskParams build() { indexEndTimeMillis, shardId, metrics, - labels + labels, + dimensions ); } } diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java index 2788932a228a8..74375bbe27939 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java @@ -37,7 +37,7 @@ protected FieldValueFetcher(String name, MappedFieldType fieldType, IndexFieldDa this.name = name; this.fieldType = fieldType; this.fieldData = fieldData; - this.fieldProducer = createieldProducer(); + this.fieldProducer = createFieldProducer(); } public String name() { @@ -53,7 +53,7 @@ public AbstractDownsampleFieldProducer fieldProducer() { return fieldProducer; } - private AbstractDownsampleFieldProducer createieldProducer() { + private AbstractDownsampleFieldProducer createFieldProducer() { if (fieldType.getMetricType() != null) { return switch (fieldType.getMetricType()) { case GAUGE -> new MetricFieldProducer.GaugeMetricFieldProducer(name()); diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java index 322267a14d32f..4af06a1c6543f 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java @@ -334,7 +334,8 @@ protected void masterOperation( downsampleIndexName, parentTask, metricFields, - labelFields + labelFields, + dimensionFields ); } else { listener.onFailure(new ElasticsearchException("Failed to create downsample index [" + downsampleIndexName + "]")); @@ -348,7 +349,8 @@ protected void masterOperation( downsampleIndexName, parentTask, metricFields, - labelFields + labelFields, + dimensionFields ); } else { listener.onFailure(e); @@ -366,7 +368,8 @@ private void performShardDownsampling( String downsampleIndexName, TaskId parentTask, List metricFields, - List labelFields + List labelFields, + List dimensionFields ) { final int numberOfShards = sourceIndexMetadata.getNumberOfShards(); final Index sourceIndex = sourceIndexMetadata.getIndex(); @@ -386,6 +389,7 @@ private void performShardDownsampling( downsampleIndexName, metricFields, labelFields, + dimensionFields, shardId ); Predicate> predicate = runningTask -> { @@ -481,6 +485,7 @@ private static DownsampleShardTaskParams createPersistentTaskParams( final String targetIndexName, final List metricFields, final List labelFields, + final List dimensionFields, final ShardId shardId ) { return new DownsampleShardTaskParams( @@ -490,7 +495,8 @@ private static DownsampleShardTaskParams createPersistentTaskParams( parseTimestamp(sourceIndexMetadata, IndexSettings.TIME_SERIES_END_TIME), shardId, metricFields.toArray(new String[0]), - labelFields.toArray(new String[0]) + labelFields.toArray(new String[0]), + dimensionFields.toArray(new String[0]) ); } diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleIndexerAction.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleIndexerAction.java index a7c34cacae5be..24d1df638f80b 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleIndexerAction.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleIndexerAction.java @@ -144,6 +144,7 @@ protected DownsampleIndexerAction.ShardDownsampleResponse shardOperation( request.getRollupConfig(), request.getMetricFields(), request.getLabelFields(), + request.getDimensionFields(), new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.INITIALIZED, null) ); return indexer.execute(); diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java index c0abab1234133..8beca42bf2a5f 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java @@ -627,6 +627,7 @@ public void testCancelDownsampleIndexer() throws IOException { config, new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 }, new String[] {}, + new String[] { FIELD_DIMENSION_1, FIELD_DIMENSION_2 }, new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.INITIALIZED, null) ); @@ -675,6 +676,7 @@ public void testDownsampleBulkFailed() throws IOException { config, new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 }, new String[] {}, + new String[] { FIELD_DIMENSION_1, FIELD_DIMENSION_2 }, new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.INITIALIZED, null) ); @@ -741,6 +743,7 @@ public void testTooManyBytesInFlight() throws IOException { config, new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 }, new String[] {}, + new String[] { FIELD_DIMENSION_1, FIELD_DIMENSION_2 }, new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.INITIALIZED, null) ); /* @@ -792,6 +795,7 @@ public void testDownsampleStats() throws IOException { config, new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 }, new String[] {}, + new String[] { FIELD_DIMENSION_1, FIELD_DIMENSION_2 }, new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.INITIALIZED, null) ); @@ -848,6 +852,7 @@ public void testResumeDownsample() throws IOException { config, new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 }, new String[] {}, + new String[] { FIELD_DIMENSION_1, FIELD_DIMENSION_2 }, new DownsampleShardPersistentTaskState( DownsampleShardIndexerStatus.STARTED, new BytesRef( @@ -922,37 +927,56 @@ public void testResumeDownsamplePartial() throws IOException { config, new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 }, new String[] {}, + new String[] { FIELD_DIMENSION_1 }, new DownsampleShardPersistentTaskState( DownsampleShardIndexerStatus.STARTED, + // NOTE: there is just one dimension with two possible values, this needs to be one of the two possible tsid values. new BytesRef( new byte[] { - 0x01, - 0x0C, - 0x64, - 0x69, - 0x6d, - 0x65, - 0x6E, - 0x73, - 0x69, - 0x6F, - 0x6E, - 0x5F, - 0x6B, - 0x77, + 0x24, + 0x42, + (byte) 0xe4, + (byte) 0x9f, + (byte) 0xe2, + (byte) 0xde, + (byte) 0xbb, + (byte) 0xf8, + (byte) 0xfc, + 0x7d, + 0x1a, + (byte) 0xb1, + 0x27, + (byte) 0x85, + (byte) 0xc2, + (byte) 0x8e, + 0x3a, + (byte) 0xae, + 0x38, + 0x6c, + (byte) 0xf6, + (byte) 0xae, + 0x0f, + 0x4f, + 0x44, + (byte) 0xf1, 0x73, - 0x04, - 0x64, - 0x69, - 0x6D, - 0x32 } + 0x02, + (byte) 0x90, + 0x1d, + 0x79, + (byte) 0xf8, + 0x0d, + (byte) 0xc2, + 0x7e, + (byte) 0x91, + 0x15 } ) ) ); final DownsampleIndexerAction.ShardDownsampleResponse response2 = indexer.execute(); int dim2DocCount = client().prepareSearch(sourceIndex) - .setQuery(new TermQueryBuilder(FIELD_DIMENSION_1, "dim2")) + .setQuery(new TermQueryBuilder(FIELD_DIMENSION_1, "dim1")) .setSize(10_000) .get() .getHits() diff --git a/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorTests.java b/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorTests.java index 3b04c276d584e..b9e073c3d70dd 100644 --- a/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorTests.java +++ b/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorTests.java @@ -803,7 +803,7 @@ private void assertGeoLine_TSDB( ArrayList fields = new ArrayList<>( Arrays.asList( new SortedDocValuesField("group_id", new BytesRef(testData.groups[g])), - new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, builder.withoutHash().toBytesRef()) + new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, builder.withHash().toBytesRef()) ) ); GeoPoint point = points.get(i);