Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom metadata support to data steams. #64169

Merged
merged 2 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,19 @@ public final class DataStream {
String indexTemplate;
@Nullable
String ilmPolicyName;
@Nullable
private final Map<String, Object> metadata;

public DataStream(String name, String timeStampField, List<String> indices, long generation, ClusterHealthStatus dataStreamStatus,
@Nullable String indexTemplate, @Nullable String ilmPolicyName) {
@Nullable String indexTemplate, @Nullable String ilmPolicyName, @Nullable Map<String, Object> metadata) {
this.name = name;
this.timeStampField = timeStampField;
this.indices = indices;
this.generation = generation;
this.dataStreamStatus = dataStreamStatus;
this.indexTemplate = indexTemplate;
this.ilmPolicyName = ilmPolicyName;
this.metadata = metadata;
}

public String getName() {
Expand Down Expand Up @@ -81,13 +84,18 @@ public String getIlmPolicyName() {
return ilmPolicyName;
}

public Map<String, Object> getMetadata() {
return metadata;
}

public static final ParseField NAME_FIELD = new ParseField("name");
public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field");
public static final ParseField INDICES_FIELD = new ParseField("indices");
public static final ParseField GENERATION_FIELD = new ParseField("generation");
public static final ParseField STATUS_FIELD = new ParseField("status");
public static final ParseField INDEX_TEMPLATE_FIELD = new ParseField("template");
public static final ParseField ILM_POLICY_FIELD = new ParseField("ilm_policy");
public static final ParseField METADATA_FIELD = new ParseField("_meta");

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream",
Expand All @@ -101,7 +109,8 @@ public String getIlmPolicyName() {
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusStr);
String indexTemplate = (String) args[5];
String ilmPolicy = (String) args[6];
return new DataStream(dataStreamName, timeStampField, indices, generation, status, indexTemplate, ilmPolicy);
Map<String, Object> metadata = (Map<String, Object>) args[7];
return new DataStream(dataStreamName, timeStampField, indices, generation, status, indexTemplate, ilmPolicy, metadata);
});

static {
Expand All @@ -112,6 +121,7 @@ public String getIlmPolicyName() {
PARSER.declareString(ConstructingObjectParser.constructorArg(), STATUS_FIELD);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), INDEX_TEMPLATE_FIELD);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), ILM_POLICY_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD);
}

public static DataStream fromXContent(XContentParser parser) throws IOException {
Expand All @@ -129,11 +139,12 @@ public boolean equals(Object o) {
indices.equals(that.indices) &&
dataStreamStatus == that.dataStreamStatus &&
Objects.equals(indexTemplate, that.indexTemplate) &&
Objects.equals(ilmPolicyName, that.ilmPolicyName);
Objects.equals(ilmPolicyName, that.ilmPolicyName) &&
Objects.equals(metadata, that.metadata);
}

@Override
public int hashCode() {
return Objects.hash(name, timeStampField, indices, generation, dataStreamStatus, indexTemplate, ilmPolicyName);
return Objects.hash(name, timeStampField, indices, generation, dataStreamStatus, indexTemplate, ilmPolicyName, metadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,25 @@

package org.elasticsearch.client.indices;

import org.elasticsearch.xpack.core.action.GetDataStreamAction;
import org.elasticsearch.xpack.core.action.GetDataStreamAction.Response.DataStreamInfo;
import org.elasticsearch.client.AbstractResponseTestCase;
import org.elasticsearch.cluster.DataStreamTestHelper;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.core.action.GetDataStreamAction;
import org.elasticsearch.xpack.core.action.GetDataStreamAction.Response.DataStreamInfo;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;

public class GetDataStreamResponseTests extends AbstractResponseTestCase<GetDataStreamAction.Response, GetDataStreamResponse> {

private static List<Index> randomIndexInstances() {
int numIndices = randomIntBetween(0, 128);
List<Index> indices = new ArrayList<>(numIndices);
for (int i = 0; i < numIndices; i++) {
indices.add(new Index(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())));
}
return indices;
}

private static DataStreamInfo randomInstance() {
List<Index> indices = randomIndexInstances();
long generation = indices.size() + randomLongBetween(1, 128);
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
indices.add(new Index(getDefaultBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(random())));
DataStream dataStream = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation);
DataStream dataStream = DataStreamTestHelper.randomInstance();
return new DataStreamInfo(dataStream, ClusterHealthStatus.YELLOW, randomAlphaOfLengthBetween(2, 10),
randomAlphaOfLengthBetween(2, 10));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.elasticsearch.cluster.metadata;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -35,6 +37,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

public final class DataStream extends AbstractDiffable<DataStream> implements ToXContentObject {
Expand All @@ -45,18 +48,20 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
private final TimestampField timeStampField;
private final List<Index> indices;
private final long generation;
private final Map<String, Object> metadata;

public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation) {
public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation, Map<String, Object> metadata) {
this.name = name;
this.timeStampField = timeStampField;
this.indices = Collections.unmodifiableList(indices);
this.generation = generation;
this.metadata = metadata;
assert indices.size() > 0;
assert indices.get(indices.size() - 1).getName().equals(getDefaultBackingIndexName(name, generation));
}

public DataStream(String name, TimestampField timeStampField, List<Index> indices) {
this(name, timeStampField, indices, indices.size());
this(name, timeStampField, indices, indices.size(), null);
}

public String getName() {
Expand All @@ -75,6 +80,11 @@ public long getGeneration() {
return generation;
}

@Nullable
public Map<String, Object> getMetadata() {
return metadata;
}

/**
* Performs a rollover on a {@code DataStream} instance and returns a new instance containing
* the updated list of backing indices and incremented generation.
Expand All @@ -87,7 +97,7 @@ public DataStream rollover(Index newWriteIndex) {
assert newWriteIndex.getName().equals(getDefaultBackingIndexName(name, generation + 1));
List<Index> backingIndices = new ArrayList<>(indices);
backingIndices.add(newWriteIndex);
return new DataStream(name, timeStampField, backingIndices, generation + 1);
return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata);
}

/**
Expand All @@ -101,7 +111,7 @@ public DataStream removeBackingIndex(Index index) {
List<Index> backingIndices = new ArrayList<>(indices);
backingIndices.remove(index);
assert backingIndices.size() == indices.size() - 1;
return new DataStream(name, timeStampField, backingIndices, generation);
return new DataStream(name, timeStampField, backingIndices, generation, metadata);
}

/**
Expand All @@ -126,7 +136,7 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki
"it is the write index", existingBackingIndex.getName(), name));
}
backingIndices.set(backingIndexPosition, newBackingIndex);
return new DataStream(name, timeStampField, backingIndices, generation);
return new DataStream(name, timeStampField, backingIndices, generation, metadata);
}

/**
Expand All @@ -142,7 +152,8 @@ public static String getDefaultBackingIndexName(String dataStreamName, long gene
}

public DataStream(StreamInput in) throws IOException {
this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong());
this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong(),
in.getVersion().onOrAfter(Version.V_7_11_0) ? in.readMap(): null);
}

public static Diff<DataStream> readDiffFrom(StreamInput in) throws IOException {
Expand All @@ -155,22 +166,28 @@ public void writeTo(StreamOutput out) throws IOException {
timeStampField.writeTo(out);
out.writeList(indices);
out.writeVLong(generation);
if (out.getVersion().onOrAfter(Version.V_7_11_0)) {
out.writeMap(metadata);
}
}

public static final ParseField NAME_FIELD = new ParseField("name");
public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field");
public static final ParseField INDICES_FIELD = new ParseField("indices");
public static final ParseField GENERATION_FIELD = new ParseField("generation");
public static final ParseField METADATA_FIELD = new ParseField("_meta");

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream",
args -> new DataStream((String) args[0], (TimestampField) args[1], (List<Index>) args[2], (Long) args[3]));
args -> new DataStream((String) args[0], (TimestampField) args[1], (List<Index>) args[2], (Long) args[3],
(Map<String, Object>) args[4]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), TimestampField.PARSER, TIMESTAMP_FIELD_FIELD);
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> Index.fromXContent(p), INDICES_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD);
}

public static DataStream fromXContent(XContentParser parser) throws IOException {
Expand All @@ -184,6 +201,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(TIMESTAMP_FIELD_FIELD.getPreferredName(), timeStampField);
builder.field(INDICES_FIELD.getPreferredName(), indices);
builder.field(GENERATION_FIELD.getPreferredName(), generation);
if (metadata != null) {
builder.field(METADATA_FIELD.getPreferredName(), metadata);
}
builder.endObject();
return builder;
}
Expand All @@ -196,12 +216,13 @@ public boolean equals(Object o) {
return name.equals(that.name) &&
timeStampField.equals(that.timeStampField) &&
indices.equals(that.indices) &&
generation == that.generation;
generation == that.generation &&
Objects.equals(metadata, that.metadata);
}

@Override
public int hashCode() {
return Objects.hash(name, timeStampField, indices, generation);
return Objects.hash(name, timeStampField, indices, generation, metadata);
}

public static final class TimestampField implements Writeable, ToXContentObject {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -162,8 +163,10 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn

String fieldName = template.getDataStreamTemplate().getTimestampField();
DataStream.TimestampField timestampField = new DataStream.TimestampField(fieldName);
DataStream newDataStream = new DataStream(request.name, timestampField,
Collections.singletonList(firstBackingIndex.getIndex()));
DataStream newDataStream =
new DataStream(request.name, timestampField,
Collections.singletonList(firstBackingIndex.getIndex()), 1L,
template.metadata() != null ? Collections.unmodifiableMap(new HashMap<>(template.metadata())) : null);
Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream);
logger.info("adding data stream [{}]", request.name);
return ClusterState.builder(currentState).metadata(builder).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,8 @@ static DataStream updateDataStream(DataStream dataStream, Metadata.Builder metad
List<Index> updatedIndices = dataStream.getIndices().stream()
.map(i -> metadata.get(renameIndex(i.getName(), request, true)).getIndex())
.collect(Collectors.toList());
return new DataStream(dataStreamName, dataStream.getTimeStampField(), updatedIndices, dataStream.getGeneration());
return new DataStream(dataStreamName, dataStream.getTimeStampField(), updatedIndices, dataStream.getGeneration(),
dataStream.getMetadata());
}

public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInProgress oldRestore, Set<Index> deletedIndices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1728,7 +1728,7 @@ public void testIndicesAliasesRequestTargetDataStreams() {
Metadata.Builder mdBuilder = Metadata.builder()
.put(backingIndex, false)
.put(new DataStream(dataStreamName, createTimestampField("@timestamp"),
org.elasticsearch.common.collect.List.of(backingIndex.getIndex()), 1));
org.elasticsearch.common.collect.List.of(backingIndex.getIndex())));
ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build();

{
Expand Down Expand Up @@ -1918,7 +1918,7 @@ public void testDataStreams() {
.put(index1, false)
.put(index2, false)
.put(new DataStream(dataStreamName, createTimestampField("@timestamp"),
org.elasticsearch.common.collect.List.of(index1.getIndex(), index2.getIndex()), 2));
org.elasticsearch.common.collect.List.of(index1.getIndex(), index2.getIndex())));
ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build();

{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,7 @@ public void testBuilderForDataStreamWithRandomlyNumberedBackingIndices() {
backingIndices.add(im.getIndex());
}

b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum));
b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum, null));
Metadata metadata = b.build();
assertThat(metadata.dataStreams().size(), equalTo(1));
assertThat(metadata.dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
Expand All @@ -1072,7 +1072,7 @@ public void testBuildIndicesLookupForDataStreams() {
indices.add(idx.getIndex());
b.put(idx, true);
}
b.put(new DataStream(name, createTimestampField("@timestamp"), indices, indices.size()));
b.put(new DataStream(name, createTimestampField("@timestamp"), indices));
}

Metadata metadata = b.build();
Expand Down Expand Up @@ -1138,8 +1138,7 @@ public void testValidateDataStreamsThrowsExceptionOnConflict() {
DataStream dataStream = new DataStream(
dataStreamName,
createTimestampField("@timestamp"),
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()),
backingIndices.size()
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList())
);

IndexAbstraction.DataStream dataStreamAbstraction = new IndexAbstraction.DataStream(dataStream, backingIndices);
Expand Down Expand Up @@ -1212,8 +1211,7 @@ public void testValidateDataStreamsAllowsPrefixedBackingIndices() {
DataStream dataStream = new DataStream(
dataStreamName,
createTimestampField("@timestamp"),
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()),
backingIndices.size()
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList())
);

IndexAbstraction.DataStream dataStreamAbstraction = new IndexAbstraction.DataStream(dataStream, backingIndices);
Expand Down Expand Up @@ -1313,7 +1311,7 @@ private static CreateIndexResult createIndices(int numIndices, int numBackingInd
b.put(im, false);
backingIndices.add(im.getIndex());
}
b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum));
b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum, null));
return new CreateIndexResult(indices, backingIndices, b.build());
}

Expand Down
Loading