From 5dace550d9220e09064540d8fc72eb361e8709c2 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 26 Oct 2020 15:04:26 +0100 Subject: [PATCH] Add custom metadata support to data steams. (#63991) Composable index template may hold custom metadata. This change adds behaviour that when a data stream gets created the custom metadata of the matching composable index template is copied to new data stream. The get data stream api can then be used to view the custom metadata. Example: ``` PUT /_index_template/my-logs-template { "index_patterns": [ "logs-*" ], "data_stream": { }, "template": { "settings": { "index.number_of_replicas": 0 } }, "_meta": { "managed": true } } PUT /_data_stream/logs-myapp GET /_data_stream ``` The get data stream api then yields the following response: ``` { "data_streams": [ { "name": "logs-myapp", "timestamp_field": { "name": "@timestamp" }, "indices": [ { "index_name": ".ds-logs-myapp-000001", "index_uuid": "3UaBxM3mQXuHR6qx0IDVCw" } ], "generation": 1, "_meta": { "managed": true }, "status": "GREEN", "template": "my-logs-template" } ] } ``` Closes #59195 --- .../client/indices/DataStream.java | 19 ++++++-- .../indices/GetDataStreamResponseTests.java | 26 ++--------- .../cluster/metadata/DataStream.java | 39 +++++++++++++---- .../MetadataCreateDataStreamService.java | 4 +- .../snapshots/RestoreService.java | 3 +- .../IndexNameExpressionResolverTests.java | 4 +- .../cluster/metadata/MetadataTests.java | 12 +++--- .../cluster/DataStreamTestHelper.java | 11 ++++- .../core/action/GetDataStreamAction.java | 3 ++ .../xpack/core/ilm/RolloverStepTests.java | 2 +- .../UpdateRolloverLifecycleDateStepTests.java | 2 +- .../core/ilm/WaitForActiveShardsTests.java | 2 +- .../ilm/WaitForRolloverReadyStepTests.java | 2 +- .../datastreams/DataStreamIT.java | 43 ++++++++++++++++--- .../datafeed/DatafeedNodeSelectorTests.java | 2 +- .../test/data_stream/10_basic.yml | 42 ++++++++++++++++++ 16 files changed, 156 insertions(+), 60 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java index c7d4c61d420cc..76b5136bedaa6 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java @@ -41,9 +41,11 @@ public final class DataStream { String indexTemplate; @Nullable String ilmPolicyName; + @Nullable + private final Map metadata; public DataStream(String name, String timeStampField, List indices, long generation, ClusterHealthStatus dataStreamStatus, - @Nullable String indexTemplate, @Nullable String ilmPolicyName) { + @Nullable String indexTemplate, @Nullable String ilmPolicyName, @Nullable Map metadata) { this.name = name; this.timeStampField = timeStampField; this.indices = indices; @@ -51,6 +53,7 @@ public DataStream(String name, String timeStampField, List indices, long this.dataStreamStatus = dataStreamStatus; this.indexTemplate = indexTemplate; this.ilmPolicyName = ilmPolicyName; + this.metadata = metadata; } public String getName() { @@ -81,6 +84,10 @@ public String getIlmPolicyName() { return ilmPolicyName; } + public Map getMetadata() { + return metadata; + } + public static final ParseField NAME_FIELD = new ParseField("name"); public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field"); public static final ParseField INDICES_FIELD = new ParseField("indices"); @@ -88,6 +95,7 @@ public String getIlmPolicyName() { public static final ParseField STATUS_FIELD = new ParseField("status"); public static final ParseField INDEX_TEMPLATE_FIELD = new ParseField("template"); public static final ParseField ILM_POLICY_FIELD = new ParseField("ilm_policy"); + public static final ParseField METADATA_FIELD = new ParseField("_meta"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_stream", @@ -101,7 +109,8 @@ public String getIlmPolicyName() { ClusterHealthStatus status = ClusterHealthStatus.fromString(statusStr); String indexTemplate = (String) args[5]; String ilmPolicy = (String) args[6]; - return new DataStream(dataStreamName, timeStampField, indices, generation, status, indexTemplate, ilmPolicy); + Map metadata = (Map) args[7]; + return new DataStream(dataStreamName, timeStampField, indices, generation, status, indexTemplate, ilmPolicy, metadata); }); static { @@ -112,6 +121,7 @@ public String getIlmPolicyName() { PARSER.declareString(ConstructingObjectParser.constructorArg(), STATUS_FIELD); PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), INDEX_TEMPLATE_FIELD); PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), ILM_POLICY_FIELD); + PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD); } public static DataStream fromXContent(XContentParser parser) throws IOException { @@ -129,11 +139,12 @@ public boolean equals(Object o) { indices.equals(that.indices) && dataStreamStatus == that.dataStreamStatus && Objects.equals(indexTemplate, that.indexTemplate) && - Objects.equals(ilmPolicyName, that.ilmPolicyName); + Objects.equals(ilmPolicyName, that.ilmPolicyName) && + Objects.equals(metadata, that.metadata); } @Override public int hashCode() { - return Objects.hash(name, timeStampField, indices, generation, dataStreamStatus, indexTemplate, ilmPolicyName); + return Objects.hash(name, timeStampField, indices, generation, dataStreamStatus, indexTemplate, ilmPolicyName, metadata); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetDataStreamResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetDataStreamResponseTests.java index 34b5b2af9bf74..3dbcb8afe68d0 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetDataStreamResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetDataStreamResponseTests.java @@ -19,43 +19,25 @@ package org.elasticsearch.client.indices; -import org.elasticsearch.xpack.core.action.GetDataStreamAction; -import org.elasticsearch.xpack.core.action.GetDataStreamAction.Response.DataStreamInfo; import org.elasticsearch.client.AbstractResponseTestCase; +import org.elasticsearch.cluster.DataStreamTestHelper; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.DataStream; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; +import org.elasticsearch.xpack.core.action.GetDataStreamAction; +import org.elasticsearch.xpack.core.action.GetDataStreamAction.Response.DataStreamInfo; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; -import java.util.List; -import java.util.Locale; import java.util.stream.Collectors; -import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField; -import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName; - public class GetDataStreamResponseTests extends AbstractResponseTestCase { - private static List randomIndexInstances() { - int numIndices = randomIntBetween(0, 128); - List indices = new ArrayList<>(numIndices); - for (int i = 0; i < numIndices; i++) { - indices.add(new Index(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random()))); - } - return indices; - } - private static DataStreamInfo randomInstance() { - List indices = randomIndexInstances(); - long generation = indices.size() + randomLongBetween(1, 128); - String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - indices.add(new Index(getDefaultBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(random()))); - DataStream dataStream = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation); + DataStream dataStream = DataStreamTestHelper.randomInstance(); return new DataStreamInfo(dataStream, ClusterHealthStatus.YELLOW, randomAlphaOfLengthBetween(2, 10), randomAlphaOfLengthBetween(2, 10)); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 2c3d446306b99..59dda946417f8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -18,8 +18,10 @@ */ package org.elasticsearch.cluster.metadata; +import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diff; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -35,6 +37,7 @@ import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; public final class DataStream extends AbstractDiffable implements ToXContentObject { @@ -45,18 +48,20 @@ public final class DataStream extends AbstractDiffable implements To private final TimestampField timeStampField; private final List indices; private final long generation; + private final Map metadata; - public DataStream(String name, TimestampField timeStampField, List indices, long generation) { + public DataStream(String name, TimestampField timeStampField, List indices, long generation, Map metadata) { this.name = name; this.timeStampField = timeStampField; this.indices = Collections.unmodifiableList(indices); this.generation = generation; + this.metadata = metadata; assert indices.size() > 0; assert indices.get(indices.size() - 1).getName().equals(getDefaultBackingIndexName(name, generation)); } public DataStream(String name, TimestampField timeStampField, List indices) { - this(name, timeStampField, indices, indices.size()); + this(name, timeStampField, indices, indices.size(), null); } public String getName() { @@ -75,6 +80,11 @@ public long getGeneration() { return generation; } + @Nullable + public Map getMetadata() { + return metadata; + } + /** * Performs a rollover on a {@code DataStream} instance and returns a new instance containing * the updated list of backing indices and incremented generation. @@ -87,7 +97,7 @@ public DataStream rollover(Index newWriteIndex) { assert newWriteIndex.getName().equals(getDefaultBackingIndexName(name, generation + 1)); List backingIndices = new ArrayList<>(indices); backingIndices.add(newWriteIndex); - return new DataStream(name, timeStampField, backingIndices, generation + 1); + return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata); } /** @@ -101,7 +111,7 @@ public DataStream removeBackingIndex(Index index) { List backingIndices = new ArrayList<>(indices); backingIndices.remove(index); assert backingIndices.size() == indices.size() - 1; - return new DataStream(name, timeStampField, backingIndices, generation); + return new DataStream(name, timeStampField, backingIndices, generation, metadata); } /** @@ -126,7 +136,7 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki "it is the write index", existingBackingIndex.getName(), name)); } backingIndices.set(backingIndexPosition, newBackingIndex); - return new DataStream(name, timeStampField, backingIndices, generation); + return new DataStream(name, timeStampField, backingIndices, generation, metadata); } /** @@ -142,7 +152,8 @@ public static String getDefaultBackingIndexName(String dataStreamName, long gene } public DataStream(StreamInput in) throws IOException { - this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong()); + this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong(), + in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readMap(): null); } public static Diff readDiffFrom(StreamInput in) throws IOException { @@ -155,22 +166,28 @@ public void writeTo(StreamOutput out) throws IOException { timeStampField.writeTo(out); out.writeList(indices); out.writeVLong(generation); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeMap(metadata); + } } public static final ParseField NAME_FIELD = new ParseField("name"); public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field"); public static final ParseField INDICES_FIELD = new ParseField("indices"); public static final ParseField GENERATION_FIELD = new ParseField("generation"); + public static final ParseField METADATA_FIELD = new ParseField("_meta"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_stream", - args -> new DataStream((String) args[0], (TimestampField) args[1], (List) args[2], (Long) args[3])); + args -> new DataStream((String) args[0], (TimestampField) args[1], (List) args[2], (Long) args[3], + (Map) args[4])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD); PARSER.declareObject(ConstructingObjectParser.constructorArg(), TimestampField.PARSER, TIMESTAMP_FIELD_FIELD); PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> Index.fromXContent(p), INDICES_FIELD); PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_FIELD); + PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD); } public static DataStream fromXContent(XContentParser parser) throws IOException { @@ -184,6 +201,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(TIMESTAMP_FIELD_FIELD.getPreferredName(), timeStampField); builder.field(INDICES_FIELD.getPreferredName(), indices); builder.field(GENERATION_FIELD.getPreferredName(), generation); + if (metadata != null) { + builder.field(METADATA_FIELD.getPreferredName(), metadata); + } builder.endObject(); return builder; } @@ -196,12 +216,13 @@ public boolean equals(Object o) { return name.equals(that.name) && timeStampField.equals(that.timeStampField) && indices.equals(that.indices) && - generation == that.generation; + generation == that.generation && + Objects.equals(metadata, that.metadata); } @Override public int hashCode() { - return Objects.hash(name, timeStampField, indices, generation); + return Objects.hash(name, timeStampField, indices, generation, metadata); } public static final class TimestampField implements Writeable, ToXContentObject { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java index b47432d202d0e..7d66c2b0b266f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -158,7 +158,9 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn String fieldName = template.getDataStreamTemplate().getTimestampField(); DataStream.TimestampField timestampField = new DataStream.TimestampField(fieldName); - DataStream newDataStream = new DataStream(request.name, timestampField, List.of(firstBackingIndex.getIndex())); + DataStream newDataStream = + new DataStream(request.name, timestampField, List.of(firstBackingIndex.getIndex()), 1L, + template.metadata() != null ? Map.copyOf(template.metadata()) : null); Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream); logger.info("adding data stream [{}]", request.name); return ClusterState.builder(currentState).metadata(builder).build(); diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 3821af6861545..3ba7dfafbddb9 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -608,7 +608,8 @@ static DataStream updateDataStream(DataStream dataStream, Metadata.Builder metad List updatedIndices = dataStream.getIndices().stream() .map(i -> metadata.get(renameIndex(i.getName(), request, true)).getIndex()) .collect(Collectors.toList()); - return new DataStream(dataStreamName, dataStream.getTimeStampField(), updatedIndices, dataStream.getGeneration()); + return new DataStream(dataStreamName, dataStream.getTimeStampField(), updatedIndices, dataStream.getGeneration(), + dataStream.getMetadata()); } public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInProgress oldRestore, Set deletedIndices) { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java index ade851cf15f9d..d592d861ae9b7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java @@ -1727,7 +1727,7 @@ public void testIndicesAliasesRequestTargetDataStreams() { Metadata.Builder mdBuilder = Metadata.builder() .put(backingIndex, false) - .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(backingIndex.getIndex()), 1)); + .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(backingIndex.getIndex()))); ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build(); { @@ -1916,7 +1916,7 @@ public void testDataStreams() { Metadata.Builder mdBuilder = Metadata.builder() .put(index1, false) .put(index2, false) - .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(index1.getIndex(), index2.getIndex()), 2)); + .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(index1.getIndex(), index2.getIndex()))); ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build(); { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java index 3f71ca848458e..62144be858dd9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java @@ -1016,7 +1016,7 @@ public void testBuilderForDataStreamWithRandomlyNumberedBackingIndices() { backingIndices.add(im.getIndex()); } - b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum)); + b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum, null)); Metadata metadata = b.build(); assertThat(metadata.dataStreams().size(), equalTo(1)); assertThat(metadata.dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName)); @@ -1034,7 +1034,7 @@ public void testBuildIndicesLookupForDataStreams() { indices.add(idx.getIndex()); b.put(idx, true); } - b.put(new DataStream(name, createTimestampField("@timestamp"), indices, indices.size())); + b.put(new DataStream(name, createTimestampField("@timestamp"), indices)); } Metadata metadata = b.build(); @@ -1100,8 +1100,7 @@ public void testValidateDataStreamsThrowsExceptionOnConflict() { DataStream dataStream = new DataStream( dataStreamName, createTimestampField("@timestamp"), - backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()), - backingIndices.size() + backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()) ); IndexAbstraction.DataStream dataStreamAbstraction = new IndexAbstraction.DataStream(dataStream, backingIndices); @@ -1174,8 +1173,7 @@ public void testValidateDataStreamsAllowsPrefixedBackingIndices() { DataStream dataStream = new DataStream( dataStreamName, createTimestampField("@timestamp"), - backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()), - backingIndices.size() + backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()) ); IndexAbstraction.DataStream dataStreamAbstraction = new IndexAbstraction.DataStream(dataStream, backingIndices); @@ -1275,7 +1273,7 @@ private static CreateIndexResult createIndices(int numIndices, int numBackingInd b.put(im, false); backingIndices.add(im.getIndex()); } - b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum)); + b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum, null)); return new CreateIndexResult(indices, backingIndices, b.build()); } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java index c8a7e3f1a8ec0..8f7871e736c1e 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java @@ -32,11 +32,13 @@ import java.util.ArrayList; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID; import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength; +import static org.elasticsearch.test.ESTestCase.randomBoolean; public final class DataStreamTestHelper { @@ -103,7 +105,11 @@ public static DataStream randomInstance() { long generation = indices.size() + ESTestCase.randomLongBetween(1, 128); String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); indices.add(new Index(getDefaultBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(LuceneTestCase.random()))); - return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation); + Map metadata = null; + if (randomBoolean()) { + metadata = Map.of("key", "value"); + } + return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation, metadata); } /** @@ -127,7 +133,8 @@ public static ClusterState getClusterStateWithDataStreams(List conditionsMet = new SetOnce<>(); Metadata metadata = Metadata.builder().put(indexMetadata, true) .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), - List.of(indexMetadata.getIndex()), 1L)) + List.of(indexMetadata.getIndex()))) .build(); step.evaluateCondition(metadata, indexMetadata.getIndex(), new AsyncWaitStep.Listener() { diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java index 94daa9b951db0..c676aadb04235 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java @@ -56,6 +56,7 @@ import org.elasticsearch.xpack.core.action.CreateDataStreamAction; import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; import org.elasticsearch.xpack.core.action.GetDataStreamAction; +import org.elasticsearch.xpack.core.action.GetDataStreamAction.Response.DataStreamInfo; import org.elasticsearch.xpack.datastreams.DataStreamsPlugin; import org.junit.After; @@ -724,7 +725,7 @@ public void testTimestampFieldCustomAttributes() throws Exception { + " }\n" + " }\n" + " }"; - putComposableIndexTemplate("id1", mapping, List.of("logs-foo*"), null); + putComposableIndexTemplate("id1", mapping, List.of("logs-foo*"), null, null); CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("logs-foobar"); client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get(); @@ -908,7 +909,7 @@ public void testSearchAllResolvesDataStreams() throws Exception { public void testGetDataStream() throws Exception { Settings settings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, maximumNumberOfReplicas() + 2).build(); - putComposableIndexTemplate("template_for_foo", null, List.of("metrics-foo*"), settings); + putComposableIndexTemplate("template_for_foo", null, List.of("metrics-foo*"), settings, null); int numDocsFoo = randomIntBetween(2, 16); indexDocs("metrics-foo", numDocsFoo); @@ -918,7 +919,7 @@ public void testGetDataStream() throws Exception { new GetDataStreamAction.Request(new String[] { "metrics-foo" }) ).actionGet(); assertThat(response.getDataStreams().size(), is(1)); - GetDataStreamAction.Response.DataStreamInfo metricsFooDataStream = response.getDataStreams().get(0); + DataStreamInfo metricsFooDataStream = response.getDataStreams().get(0); assertThat(metricsFooDataStream.getDataStream().getName(), is("metrics-foo")); assertThat(metricsFooDataStream.getDataStreamStatus(), is(ClusterHealthStatus.YELLOW)); assertThat(metricsFooDataStream.getIndexTemplate(), is("template_for_foo")); @@ -1113,6 +1114,29 @@ public void testQueryDataStreamNameInIndexField() throws Exception { assertThat(searchResponse.getHits().getTotalHits().relation, equalTo(TotalHits.Relation.EQUAL_TO)); } + public void testDataStreamMetadata() throws Exception { + Settings settings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build(); + putComposableIndexTemplate("id1", null, List.of("logs-*"), settings, Map.of("managed_by", "core-features")); + CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("logs-foobar"); + client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get(); + + GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { "*" }); + GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest) + .actionGet(); + getDataStreamResponse.getDataStreams().sort(Comparator.comparing(dataStreamInfo -> dataStreamInfo.getDataStream().getName())); + assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); + DataStreamInfo info = getDataStreamResponse.getDataStreams().get(0); + assertThat(info.getIndexTemplate(), equalTo("id1")); + assertThat(info.getDataStreamStatus(), equalTo(ClusterHealthStatus.GREEN)); + assertThat(info.getIlmPolicy(), nullValue()); + DataStream dataStream = info.getDataStream(); + assertThat(dataStream.getName(), equalTo("logs-foobar")); + assertThat(dataStream.getTimeStampField().getName(), equalTo("@timestamp")); + assertThat(dataStream.getIndices().size(), equalTo(1)); + assertThat(dataStream.getIndices().get(0).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 1))); + assertThat(dataStream.getMetadata(), equalTo(Map.of("managed_by", "core-features"))); + } + private static void verifyResolvability(String dataStream, ActionRequestBuilder requestBuilder, boolean fail) { verifyResolvability(dataStream, requestBuilder, fail, 0); } @@ -1186,11 +1210,16 @@ private static void verifyDocs(String dataStream, long expectedNumHits, long min } public static void putComposableIndexTemplate(String id, List patterns) throws IOException { - putComposableIndexTemplate(id, null, patterns, null); + putComposableIndexTemplate(id, null, patterns, null, null); } - static void putComposableIndexTemplate(String id, @Nullable String mappings, List patterns, @Nullable Settings settings) - throws IOException { + static void putComposableIndexTemplate( + String id, + @Nullable String mappings, + List patterns, + @Nullable Settings settings, + @Nullable Map metadata + ) throws IOException { PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(id); request.indexTemplate( new ComposableIndexTemplate( @@ -1199,7 +1228,7 @@ static void putComposableIndexTemplate(String id, @Nullable String mappings, Lis null, null, null, - null, + metadata, new ComposableIndexTemplate.DataStreamTemplate(), null ) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java index 46ac294be3efa..7bd17251cb01e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java @@ -523,7 +523,7 @@ private void givenClusterStateWithDatastream(String dataStreamName, clusterState = ClusterState.builder(new ClusterName("cluster_name")) .metadata(new Metadata.Builder() - .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), Collections.singletonList(index), 1L)) + .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), Collections.singletonList(index))) .putCustom(PersistentTasksCustomMetadata.TYPE, tasks) .putCustom(MlMetadata.TYPE, mlMetadata) .put(indexMetadata, false)) diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/10_basic.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/10_basic.yml index 2c0a48acc2a1e..235553581f864 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/10_basic.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/10_basic.yml @@ -400,6 +400,48 @@ setup: name: logs-foobar - is_true: acknowledged +--- +"Include metadata in a data stream": + - skip: + version: " - 7.99.99" + reason: "re-enable in 7.11 when backported" + features: allowed_warnings + + - do: + allowed_warnings: + - "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation" + indices.put_index_template: + name: generic_logs_template + body: + index_patterns: logs-* + _meta: + managed_by: 'core-features' + managed: true + template: + settings: + number_of_replicas: 0 + data_stream: {} + + - do: + indices.create_data_stream: + name: logs-foobar + - is_true: acknowledged + + - do: + indices.get_data_stream: + name: "*" + - length: { data_streams: 1 } + - match: { data_streams.0.name: 'logs-foobar' } + - match: { data_streams.0.timestamp_field.name: '@timestamp' } + - match: { data_streams.0.generation: 1 } + - length: { data_streams.0.indices: 1 } + - match: { data_streams.0.indices.0.index_name: '.ds-logs-foobar-000001' } + - match: { data_streams.0.status: 'GREEN' } + - match: { data_streams.0.template: 'generic_logs_template' } + - length: { data_streams.0._meta: 2 } + - match: { data_streams.0._meta.managed: true } + - match: { data_streams.0._meta.managed_by: 'core-features' } + --- "Create index into a namespace that is governed by a data stream template": - skip: