From d7ad78758182122dc86358f1983209dd83b7a52e Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 26 Oct 2020 15:04:26 +0100 Subject: [PATCH 1/2] Add custom metadata support to data steams. (#63991) Composable index template may hold custom metadata. This change adds behaviour that when a data stream gets created the custom metadata of the matching composable index template is copied to new data stream. The get data stream api can then be used to view the custom metadata. Example: ``` PUT /_index_template/my-logs-template { "index_patterns": [ "logs-*" ], "data_stream": { }, "template": { "settings": { "index.number_of_replicas": 0 } }, "_meta": { "managed": true } } PUT /_data_stream/logs-myapp GET /_data_stream ``` The get data stream api then yields the following response: ``` { "data_streams": [ { "name": "logs-myapp", "timestamp_field": { "name": "@timestamp" }, "indices": [ { "index_name": ".ds-logs-myapp-000001", "index_uuid": "3UaBxM3mQXuHR6qx0IDVCw" } ], "generation": 1, "_meta": { "managed": true }, "status": "GREEN", "template": "my-logs-template" } ] } ``` Closes #59195 --- .../client/indices/DataStream.java | 19 +++++++-- .../indices/GetDataStreamResponseTests.java | 26 ++---------- .../cluster/metadata/DataStream.java | 39 +++++++++++++---- .../MetadataCreateDataStreamService.java | 6 ++- .../snapshots/RestoreService.java | 3 +- .../IndexNameExpressionResolverTests.java | 4 +- .../cluster/metadata/MetadataTests.java | 12 +++--- .../cluster/DataStreamTestHelper.java | 11 ++++- .../core/action/GetDataStreamAction.java | 3 ++ .../xpack/core/ilm/RolloverStepTests.java | 2 +- .../UpdateRolloverLifecycleDateStepTests.java | 2 +- .../core/ilm/WaitForActiveShardsTests.java | 3 +- .../ilm/WaitForRolloverReadyStepTests.java | 2 +- .../datastreams/DataStreamIT.java | 39 +++++++++++++---- .../datafeed/DatafeedNodeSelectorTests.java | 2 +- .../test/data_stream/10_basic.yml | 42 +++++++++++++++++++ 16 files changed, 153 insertions(+), 62 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java index c7d4c61d420cc..76b5136bedaa6 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java @@ -41,9 +41,11 @@ public final class DataStream { String indexTemplate; @Nullable String ilmPolicyName; + @Nullable + private final Map metadata; public DataStream(String name, String timeStampField, List indices, long generation, ClusterHealthStatus dataStreamStatus, - @Nullable String indexTemplate, @Nullable String ilmPolicyName) { + @Nullable String indexTemplate, @Nullable String ilmPolicyName, @Nullable Map metadata) { this.name = name; this.timeStampField = timeStampField; this.indices = indices; @@ -51,6 +53,7 @@ public DataStream(String name, String timeStampField, List indices, long this.dataStreamStatus = dataStreamStatus; this.indexTemplate = indexTemplate; this.ilmPolicyName = ilmPolicyName; + this.metadata = metadata; } public String getName() { @@ -81,6 +84,10 @@ public String getIlmPolicyName() { return ilmPolicyName; } + public Map getMetadata() { + return metadata; + } + public static final ParseField NAME_FIELD = new ParseField("name"); public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field"); public static final ParseField INDICES_FIELD = new ParseField("indices"); @@ -88,6 +95,7 @@ public String getIlmPolicyName() { public static final ParseField STATUS_FIELD = new ParseField("status"); public static final ParseField INDEX_TEMPLATE_FIELD = new ParseField("template"); public static final ParseField ILM_POLICY_FIELD = new ParseField("ilm_policy"); + public static final ParseField METADATA_FIELD = new ParseField("_meta"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_stream", @@ -101,7 +109,8 @@ public String getIlmPolicyName() { ClusterHealthStatus status = ClusterHealthStatus.fromString(statusStr); String indexTemplate = (String) args[5]; String ilmPolicy = (String) args[6]; - return new DataStream(dataStreamName, timeStampField, indices, generation, status, indexTemplate, ilmPolicy); + Map metadata = (Map) args[7]; + return new DataStream(dataStreamName, timeStampField, indices, generation, status, indexTemplate, ilmPolicy, metadata); }); static { @@ -112,6 +121,7 @@ public String getIlmPolicyName() { PARSER.declareString(ConstructingObjectParser.constructorArg(), STATUS_FIELD); PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), INDEX_TEMPLATE_FIELD); PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), ILM_POLICY_FIELD); + PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD); } public static DataStream fromXContent(XContentParser parser) throws IOException { @@ -129,11 +139,12 @@ public boolean equals(Object o) { indices.equals(that.indices) && dataStreamStatus == that.dataStreamStatus && Objects.equals(indexTemplate, that.indexTemplate) && - Objects.equals(ilmPolicyName, that.ilmPolicyName); + Objects.equals(ilmPolicyName, that.ilmPolicyName) && + Objects.equals(metadata, that.metadata); } @Override public int hashCode() { - return Objects.hash(name, timeStampField, indices, generation, dataStreamStatus, indexTemplate, ilmPolicyName); + return Objects.hash(name, timeStampField, indices, generation, dataStreamStatus, indexTemplate, ilmPolicyName, metadata); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetDataStreamResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetDataStreamResponseTests.java index 34b5b2af9bf74..3dbcb8afe68d0 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetDataStreamResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetDataStreamResponseTests.java @@ -19,43 +19,25 @@ package org.elasticsearch.client.indices; -import org.elasticsearch.xpack.core.action.GetDataStreamAction; -import org.elasticsearch.xpack.core.action.GetDataStreamAction.Response.DataStreamInfo; import org.elasticsearch.client.AbstractResponseTestCase; +import org.elasticsearch.cluster.DataStreamTestHelper; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.DataStream; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; +import org.elasticsearch.xpack.core.action.GetDataStreamAction; +import org.elasticsearch.xpack.core.action.GetDataStreamAction.Response.DataStreamInfo; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; -import java.util.List; -import java.util.Locale; import java.util.stream.Collectors; -import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField; -import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName; - public class GetDataStreamResponseTests extends AbstractResponseTestCase { - private static List randomIndexInstances() { - int numIndices = randomIntBetween(0, 128); - List indices = new ArrayList<>(numIndices); - for (int i = 0; i < numIndices; i++) { - indices.add(new Index(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random()))); - } - return indices; - } - private static DataStreamInfo randomInstance() { - List indices = randomIndexInstances(); - long generation = indices.size() + randomLongBetween(1, 128); - String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - indices.add(new Index(getDefaultBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(random()))); - DataStream dataStream = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation); + DataStream dataStream = DataStreamTestHelper.randomInstance(); return new DataStreamInfo(dataStream, ClusterHealthStatus.YELLOW, randomAlphaOfLengthBetween(2, 10), randomAlphaOfLengthBetween(2, 10)); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 2c3d446306b99..59dda946417f8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -18,8 +18,10 @@ */ package org.elasticsearch.cluster.metadata; +import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diff; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -35,6 +37,7 @@ import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; public final class DataStream extends AbstractDiffable implements ToXContentObject { @@ -45,18 +48,20 @@ public final class DataStream extends AbstractDiffable implements To private final TimestampField timeStampField; private final List indices; private final long generation; + private final Map metadata; - public DataStream(String name, TimestampField timeStampField, List indices, long generation) { + public DataStream(String name, TimestampField timeStampField, List indices, long generation, Map metadata) { this.name = name; this.timeStampField = timeStampField; this.indices = Collections.unmodifiableList(indices); this.generation = generation; + this.metadata = metadata; assert indices.size() > 0; assert indices.get(indices.size() - 1).getName().equals(getDefaultBackingIndexName(name, generation)); } public DataStream(String name, TimestampField timeStampField, List indices) { - this(name, timeStampField, indices, indices.size()); + this(name, timeStampField, indices, indices.size(), null); } public String getName() { @@ -75,6 +80,11 @@ public long getGeneration() { return generation; } + @Nullable + public Map getMetadata() { + return metadata; + } + /** * Performs a rollover on a {@code DataStream} instance and returns a new instance containing * the updated list of backing indices and incremented generation. @@ -87,7 +97,7 @@ public DataStream rollover(Index newWriteIndex) { assert newWriteIndex.getName().equals(getDefaultBackingIndexName(name, generation + 1)); List backingIndices = new ArrayList<>(indices); backingIndices.add(newWriteIndex); - return new DataStream(name, timeStampField, backingIndices, generation + 1); + return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata); } /** @@ -101,7 +111,7 @@ public DataStream removeBackingIndex(Index index) { List backingIndices = new ArrayList<>(indices); backingIndices.remove(index); assert backingIndices.size() == indices.size() - 1; - return new DataStream(name, timeStampField, backingIndices, generation); + return new DataStream(name, timeStampField, backingIndices, generation, metadata); } /** @@ -126,7 +136,7 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki "it is the write index", existingBackingIndex.getName(), name)); } backingIndices.set(backingIndexPosition, newBackingIndex); - return new DataStream(name, timeStampField, backingIndices, generation); + return new DataStream(name, timeStampField, backingIndices, generation, metadata); } /** @@ -142,7 +152,8 @@ public static String getDefaultBackingIndexName(String dataStreamName, long gene } public DataStream(StreamInput in) throws IOException { - this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong()); + this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong(), + in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readMap(): null); } public static Diff readDiffFrom(StreamInput in) throws IOException { @@ -155,22 +166,28 @@ public void writeTo(StreamOutput out) throws IOException { timeStampField.writeTo(out); out.writeList(indices); out.writeVLong(generation); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeMap(metadata); + } } public static final ParseField NAME_FIELD = new ParseField("name"); public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field"); public static final ParseField INDICES_FIELD = new ParseField("indices"); public static final ParseField GENERATION_FIELD = new ParseField("generation"); + public static final ParseField METADATA_FIELD = new ParseField("_meta"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_stream", - args -> new DataStream((String) args[0], (TimestampField) args[1], (List) args[2], (Long) args[3])); + args -> new DataStream((String) args[0], (TimestampField) args[1], (List) args[2], (Long) args[3], + (Map) args[4])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD); PARSER.declareObject(ConstructingObjectParser.constructorArg(), TimestampField.PARSER, TIMESTAMP_FIELD_FIELD); PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> Index.fromXContent(p), INDICES_FIELD); PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_FIELD); + PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD); } public static DataStream fromXContent(XContentParser parser) throws IOException { @@ -184,6 +201,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(TIMESTAMP_FIELD_FIELD.getPreferredName(), timeStampField); builder.field(INDICES_FIELD.getPreferredName(), indices); builder.field(GENERATION_FIELD.getPreferredName(), generation); + if (metadata != null) { + builder.field(METADATA_FIELD.getPreferredName(), metadata); + } builder.endObject(); return builder; } @@ -196,12 +216,13 @@ public boolean equals(Object o) { return name.equals(that.name) && timeStampField.equals(that.timeStampField) && indices.equals(that.indices) && - generation == that.generation; + generation == that.generation && + Objects.equals(metadata, that.metadata); } @Override public int hashCode() { - return Objects.hash(name, timeStampField, indices, generation); + return Objects.hash(name, timeStampField, indices, generation, metadata); } public static final class TimestampField implements Writeable, ToXContentObject { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java index 628ba3a43522e..15dce6ea3e081 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -162,8 +162,10 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn String fieldName = template.getDataStreamTemplate().getTimestampField(); DataStream.TimestampField timestampField = new DataStream.TimestampField(fieldName); - DataStream newDataStream = new DataStream(request.name, timestampField, - Collections.singletonList(firstBackingIndex.getIndex())); + DataStream newDataStream = + new DataStream(request.name, timestampField, + Collections.singletonList(firstBackingIndex.getIndex()), 1L, + template.metadata() != null ? Map.copyOf(template.metadata()) : null); Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream); logger.info("adding data stream [{}]", request.name); return ClusterState.builder(currentState).metadata(builder).build(); diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index f6575befbdfd6..b9e398a6798cb 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -617,7 +617,8 @@ static DataStream updateDataStream(DataStream dataStream, Metadata.Builder metad List updatedIndices = dataStream.getIndices().stream() .map(i -> metadata.get(renameIndex(i.getName(), request, true)).getIndex()) .collect(Collectors.toList()); - return new DataStream(dataStreamName, dataStream.getTimeStampField(), updatedIndices, dataStream.getGeneration()); + return new DataStream(dataStreamName, dataStream.getTimeStampField(), updatedIndices, dataStream.getGeneration(), + dataStream.getMetadata()); } public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInProgress oldRestore, Set deletedIndices) { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java index 76a06207ffe0e..df3729eef9aa4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java @@ -1728,7 +1728,7 @@ public void testIndicesAliasesRequestTargetDataStreams() { Metadata.Builder mdBuilder = Metadata.builder() .put(backingIndex, false) .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), - org.elasticsearch.common.collect.List.of(backingIndex.getIndex()), 1)); + org.elasticsearch.common.collect.List.of(backingIndex.getIndex()))); ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build(); { @@ -1918,7 +1918,7 @@ public void testDataStreams() { .put(index1, false) .put(index2, false) .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), - org.elasticsearch.common.collect.List.of(index1.getIndex(), index2.getIndex()), 2)); + org.elasticsearch.common.collect.List.of(index1.getIndex(), index2.getIndex()))); ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build(); { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java index 5614cbcaa2e72..59fbc2ff5292c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java @@ -1054,7 +1054,7 @@ public void testBuilderForDataStreamWithRandomlyNumberedBackingIndices() { backingIndices.add(im.getIndex()); } - b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum)); + b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum, null)); Metadata metadata = b.build(); assertThat(metadata.dataStreams().size(), equalTo(1)); assertThat(metadata.dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName)); @@ -1072,7 +1072,7 @@ public void testBuildIndicesLookupForDataStreams() { indices.add(idx.getIndex()); b.put(idx, true); } - b.put(new DataStream(name, createTimestampField("@timestamp"), indices, indices.size())); + b.put(new DataStream(name, createTimestampField("@timestamp"), indices)); } Metadata metadata = b.build(); @@ -1138,8 +1138,7 @@ public void testValidateDataStreamsThrowsExceptionOnConflict() { DataStream dataStream = new DataStream( dataStreamName, createTimestampField("@timestamp"), - backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()), - backingIndices.size() + backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()) ); IndexAbstraction.DataStream dataStreamAbstraction = new IndexAbstraction.DataStream(dataStream, backingIndices); @@ -1212,8 +1211,7 @@ public void testValidateDataStreamsAllowsPrefixedBackingIndices() { DataStream dataStream = new DataStream( dataStreamName, createTimestampField("@timestamp"), - backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()), - backingIndices.size() + backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()) ); IndexAbstraction.DataStream dataStreamAbstraction = new IndexAbstraction.DataStream(dataStream, backingIndices); @@ -1313,7 +1311,7 @@ private static CreateIndexResult createIndices(int numIndices, int numBackingInd b.put(im, false); backingIndices.add(im.getIndex()); } - b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum)); + b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum, null)); return new CreateIndexResult(indices, backingIndices, b.build()); } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java index c8a7e3f1a8ec0..8f7871e736c1e 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java @@ -32,11 +32,13 @@ import java.util.ArrayList; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID; import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength; +import static org.elasticsearch.test.ESTestCase.randomBoolean; public final class DataStreamTestHelper { @@ -103,7 +105,11 @@ public static DataStream randomInstance() { long generation = indices.size() + ESTestCase.randomLongBetween(1, 128); String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); indices.add(new Index(getDefaultBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(LuceneTestCase.random()))); - return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation); + Map metadata = null; + if (randomBoolean()) { + metadata = Map.of("key", "value"); + } + return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation, metadata); } /** @@ -127,7 +133,8 @@ public static ClusterState getClusterStateWithDataStreams(List conditionsMet = new SetOnce<>(); Metadata metadata = Metadata.builder().put(indexMetadata, true) .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), - org.elasticsearch.common.collect.List.of(indexMetadata.getIndex()), 1L)) + org.elasticsearch.common.collect.List.of(indexMetadata.getIndex()))) .build(); step.evaluateCondition(metadata, indexMetadata.getIndex(), new AsyncWaitStep.Listener() { diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java index f79f1969d0aa9..7ea71f7ce05ed 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java @@ -58,6 +58,7 @@ import org.elasticsearch.xpack.core.action.CreateDataStreamAction; import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; import org.elasticsearch.xpack.core.action.GetDataStreamAction; +import org.elasticsearch.xpack.core.action.GetDataStreamAction.Response.DataStreamInfo; import org.elasticsearch.xpack.datastreams.DataStreamsPlugin; import org.junit.After; @@ -726,7 +727,7 @@ public void testTimestampFieldCustomAttributes() throws Exception { + " }\n" + " }\n" + " }"; - putComposableIndexTemplate("id1", mapping, List.of("logs-foo*"), null); + putComposableIndexTemplate("id1", mapping, List.of("logs-foo*"), null, null); CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("logs-foobar"); client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get(); @@ -911,7 +912,7 @@ public void testSearchAllResolvesDataStreams() throws Exception { public void testGetDataStream() throws Exception { Settings settings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, maximumNumberOfReplicas() + 2).build(); - putComposableIndexTemplate("template_for_foo", null, List.of("metrics-foo*"), settings); + putComposableIndexTemplate("template_for_foo", null, List.of("metrics-foo*"), settings, null); int numDocsFoo = randomIntBetween(2, 16); indexDocs("metrics-foo", numDocsFoo); @@ -921,7 +922,7 @@ public void testGetDataStream() throws Exception { new GetDataStreamAction.Request(new String[] { "metrics-foo" }) ).actionGet(); assertThat(response.getDataStreams().size(), is(1)); - GetDataStreamAction.Response.DataStreamInfo metricsFooDataStream = response.getDataStreams().get(0); + DataStreamInfo metricsFooDataStream = response.getDataStreams().get(0); assertThat(metricsFooDataStream.getDataStream().getName(), is("metrics-foo")); assertThat(metricsFooDataStream.getDataStreamStatus(), is(ClusterHealthStatus.YELLOW)); assertThat(metricsFooDataStream.getIndexTemplate(), is("template_for_foo")); @@ -1115,6 +1116,29 @@ public void testQueryDataStreamNameInIndexField() throws Exception { assertThat(searchResponse.getHits().getTotalHits().relation, equalTo(TotalHits.Relation.EQUAL_TO)); } + public void testDataStreamMetadata() throws Exception { + Settings settings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build(); + putComposableIndexTemplate("id1", null, List.of("logs-*"), settings, Map.of("managed_by", "core-features")); + CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("logs-foobar"); + client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get(); + + GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { "*" }); + GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest) + .actionGet(); + getDataStreamResponse.getDataStreams().sort(Comparator.comparing(dataStreamInfo -> dataStreamInfo.getDataStream().getName())); + assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); + DataStreamInfo info = getDataStreamResponse.getDataStreams().get(0); + assertThat(info.getIndexTemplate(), equalTo("id1")); + assertThat(info.getDataStreamStatus(), equalTo(ClusterHealthStatus.GREEN)); + assertThat(info.getIlmPolicy(), nullValue()); + DataStream dataStream = info.getDataStream(); + assertThat(dataStream.getName(), equalTo("logs-foobar")); + assertThat(dataStream.getTimeStampField().getName(), equalTo("@timestamp")); + assertThat(dataStream.getIndices().size(), equalTo(1)); + assertThat(dataStream.getIndices().get(0).getName(), equalTo(DataStream.getDefaultBackingIndexName("logs-foobar", 1))); + assertThat(dataStream.getMetadata(), equalTo(Map.of("managed_by", "core-features"))); + } + private static void verifyResolvability(String dataStream, ActionRequestBuilder requestBuilder, boolean fail) { verifyResolvability(dataStream, requestBuilder, fail, 0); } @@ -1188,14 +1212,15 @@ private static void verifyDocs(String dataStream, long expectedNumHits, long min } public static void putComposableIndexTemplate(String id, java.util.List patterns) throws IOException { - putComposableIndexTemplate(id, null, patterns, null); + putComposableIndexTemplate(id, null, patterns, null, null); } static void putComposableIndexTemplate( String id, @Nullable String mappings, - java.util.List patterns, - @Nullable Settings settings + List patterns, + @Nullable Settings settings, + @Nullable Map metadata ) throws IOException { PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(id); request.indexTemplate( @@ -1205,7 +1230,7 @@ static void putComposableIndexTemplate( null, null, null, - null, + metadata, new ComposableIndexTemplate.DataStreamTemplate() ) ); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java index 46ac294be3efa..7bd17251cb01e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java @@ -523,7 +523,7 @@ private void givenClusterStateWithDatastream(String dataStreamName, clusterState = ClusterState.builder(new ClusterName("cluster_name")) .metadata(new Metadata.Builder() - .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), Collections.singletonList(index), 1L)) + .put(new DataStream(dataStreamName, createTimestampField("@timestamp"), Collections.singletonList(index))) .putCustom(PersistentTasksCustomMetadata.TYPE, tasks) .putCustom(MlMetadata.TYPE, mlMetadata) .put(indexMetadata, false)) diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/10_basic.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/10_basic.yml index 2c0a48acc2a1e..235553581f864 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/10_basic.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/10_basic.yml @@ -400,6 +400,48 @@ setup: name: logs-foobar - is_true: acknowledged +--- +"Include metadata in a data stream": + - skip: + version: " - 7.99.99" + reason: "re-enable in 7.11 when backported" + features: allowed_warnings + + - do: + allowed_warnings: + - "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation" + indices.put_index_template: + name: generic_logs_template + body: + index_patterns: logs-* + _meta: + managed_by: 'core-features' + managed: true + template: + settings: + number_of_replicas: 0 + data_stream: {} + + - do: + indices.create_data_stream: + name: logs-foobar + - is_true: acknowledged + + - do: + indices.get_data_stream: + name: "*" + - length: { data_streams: 1 } + - match: { data_streams.0.name: 'logs-foobar' } + - match: { data_streams.0.timestamp_field.name: '@timestamp' } + - match: { data_streams.0.generation: 1 } + - length: { data_streams.0.indices: 1 } + - match: { data_streams.0.indices.0.index_name: '.ds-logs-foobar-000001' } + - match: { data_streams.0.status: 'GREEN' } + - match: { data_streams.0.template: 'generic_logs_template' } + - length: { data_streams.0._meta: 2 } + - match: { data_streams.0._meta.managed: true } + - match: { data_streams.0._meta.managed_by: 'core-features' } + --- "Create index into a namespace that is governed by a data stream template": - skip: From 27d7537fcef52deb60b9cf584a2f8e9e0ed7d609 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 26 Oct 2020 15:27:24 +0100 Subject: [PATCH 2/2] iter --- .../java/org/elasticsearch/cluster/metadata/DataStream.java | 4 ++-- .../cluster/metadata/MetadataCreateDataStreamService.java | 3 ++- .../java/org/elasticsearch/cluster/DataStreamTestHelper.java | 4 +++- .../java/org/elasticsearch/datastreams/DataStreamIT.java | 4 ++-- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 59dda946417f8..0209682b2ad1e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -153,7 +153,7 @@ public static String getDefaultBackingIndexName(String dataStreamName, long gene public DataStream(StreamInput in) throws IOException { this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong(), - in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readMap(): null); + in.getVersion().onOrAfter(Version.V_7_11_0) ? in.readMap(): null); } public static Diff readDiffFrom(StreamInput in) throws IOException { @@ -166,7 +166,7 @@ public void writeTo(StreamOutput out) throws IOException { timeStampField.writeTo(out); out.writeList(indices); out.writeVLong(generation); - if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + if (out.getVersion().onOrAfter(Version.V_7_11_0)) { out.writeMap(metadata); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java index 15dce6ea3e081..f0748a40b57f1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -45,6 +45,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; @@ -165,7 +166,7 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn DataStream newDataStream = new DataStream(request.name, timestampField, Collections.singletonList(firstBackingIndex.getIndex()), 1L, - template.metadata() != null ? Map.copyOf(template.metadata()) : null); + template.metadata() != null ? Collections.unmodifiableMap(new HashMap<>(template.metadata())) : null); Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream); logger.info("adding data stream [{}]", request.name); return ClusterState.builder(currentState).metadata(builder).build(); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java index 8f7871e736c1e..872532d52a2c7 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java @@ -30,6 +30,7 @@ import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -107,7 +108,8 @@ public static DataStream randomInstance() { indices.add(new Index(getDefaultBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(LuceneTestCase.random()))); Map metadata = null; if (randomBoolean()) { - metadata = Map.of("key", "value"); + metadata = new HashMap<>(); + metadata.put("key", "value"); } return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation, metadata); } diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java index 7ea71f7ce05ed..d429543f8ded5 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java @@ -1218,9 +1218,9 @@ public static void putComposableIndexTemplate(String id, java.util.List static void putComposableIndexTemplate( String id, @Nullable String mappings, - List patterns, + java.util.List patterns, @Nullable Settings settings, - @Nullable Map metadata + @Nullable java.util.Map metadata ) throws IOException { PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(id); request.indexTemplate(