Skip to content

Adding mappings to data streams #129787

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -261,13 +262,17 @@ static GetDataStreamAction.Response innerOperation(
Settings settings = dataStream.getEffectiveSettings(state.metadata());
ilmPolicyName = settings.get(IndexMetadata.LIFECYCLE_NAME);
if (indexMode == null && state.metadata().templatesV2().get(indexTemplate) != null) {
indexMode = resolveMode(
state,
indexSettingProviders,
dataStream,
settings,
dataStream.getEffectiveIndexTemplate(state.metadata())
);
try {
indexMode = resolveMode(
state,
indexSettingProviders,
dataStream,
settings,
dataStream.getEffectiveIndexTemplate(state.metadata())
);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
indexTemplatePreferIlmValue = PREFER_ILM_SETTING.get(settings);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ public void testUpdateTimeSeriesTemporalOneBadDataStream() {
2,
ds2.getMetadata(),
ds2.getSettings(),
ds2.getMappings(),
ds2.isHidden(),
ds2.isReplicated(),
ds2.isSystem(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_CUSTOM_SERVICE_EMBEDDING_BATCH_SIZE = def(9_103_0_00);
public static final TransportVersion STREAMS_LOGS_SUPPORT = def(9_104_0_00);
public static final TransportVersion ML_INFERENCE_CUSTOM_SERVICE_INPUT_TYPE = def(9_105_0_00);
public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_106_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,24 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -51,6 +57,14 @@ public class ComposableIndexTemplate implements SimpleDiffable<ComposableIndexTe
private static final ParseField ALLOW_AUTO_CREATE = new ParseField("allow_auto_create");
private static final ParseField IGNORE_MISSING_COMPONENT_TEMPLATES = new ParseField("ignore_missing_component_templates");
private static final ParseField DEPRECATED = new ParseField("deprecated");
public static final CompressedXContent EMPTY_MAPPINGS;
static {
try {
EMPTY_MAPPINGS = new CompressedXContent(Map.of());
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<ComposableIndexTemplate, Void> PARSER = new ConstructingObjectParser<>(
Expand Down Expand Up @@ -338,6 +352,64 @@ public ComposableIndexTemplate mergeSettings(Settings settings) {
return mergedIndexTemplateBuilder.build();
}

public ComposableIndexTemplate mergeMappings(CompressedXContent mappings) throws IOException {
Objects.requireNonNull(mappings);
if (Mapping.EMPTY.toCompressedXContent().equals(mappings) && this.template() != null && this.template().mappings() != null) {
return this;
}
ComposableIndexTemplate.Builder mergedIndexTemplateBuilder = this.toBuilder();
Template.Builder mergedTemplateBuilder;
CompressedXContent templateMappings;
if (this.template() == null) {
mergedTemplateBuilder = Template.builder();
templateMappings = null;
} else {
mergedTemplateBuilder = Template.builder(this.template());
templateMappings = this.template().mappings();
}
mergedTemplateBuilder.mappings(templateMappings == null ? mappings : merge(templateMappings, mappings));
mergedIndexTemplateBuilder.template(mergedTemplateBuilder);
return mergedIndexTemplateBuilder.build();
}

@SuppressWarnings("unchecked")
private CompressedXContent merge(CompressedXContent originalMapping, CompressedXContent mappingAddition) throws IOException {
Map<String, Object> mappingAdditionMap = XContentHelper.convertToMap(mappingAddition.uncompressed(), true, XContentType.JSON).v2();
Map<String, Object> combinedMappingMap = new HashMap<>();
if (originalMapping != null) {
Map<String, Object> originalMappingMap = XContentHelper.convertToMap(originalMapping.uncompressed(), true, XContentType.JSON)
.v2();
if (originalMappingMap.containsKey(MapperService.SINGLE_MAPPING_NAME)) {
combinedMappingMap.putAll((Map<String, ?>) originalMappingMap.get(MapperService.SINGLE_MAPPING_NAME));
} else {
combinedMappingMap.putAll(originalMappingMap);
}
}
XContentHelper.update(combinedMappingMap, mappingAdditionMap, true);
return convertMappingMapToXContent(combinedMappingMap);
}

private static CompressedXContent convertMappingMapToXContent(Map<String, Object> rawAdditionalMapping) throws IOException {
CompressedXContent compressedXContent;
if (rawAdditionalMapping.isEmpty()) {
compressedXContent = EMPTY_MAPPINGS;
} else {
try (var parser = XContentHelper.mapToXContentParser(XContentParserConfiguration.EMPTY, rawAdditionalMapping)) {
compressedXContent = mappingFromXContent(parser);
}
}
return compressedXContent;
}

private static CompressedXContent mappingFromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.nextToken();
if (token == XContentParser.Token.START_OBJECT) {
return new CompressedXContent(Strings.toString(XContentFactory.jsonBuilder().map(parser.mapOrdered())));
} else {
throw new IllegalArgumentException("Unexpected token: " + token);
}
}

@Override
public int hashCode() {
return Objects.hash(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -47,9 +48,11 @@
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.XContentType;
Expand All @@ -58,6 +61,7 @@
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand All @@ -70,6 +74,7 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.ComposableIndexTemplate.EMPTY_MAPPINGS;
import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.lookupTemplateForDataStream;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE;
Expand All @@ -89,6 +94,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
public static final String FAILURE_STORE_PREFIX = ".fs-";
public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd");
public static final String TIMESTAMP_FIELD_NAME = "@timestamp";

// Timeseries indices' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations
public static final Comparator<LeafReader> TIMESERIES_LEAF_READERS_SORTER = Comparator.comparingLong((LeafReader r) -> {
try {
Expand Down Expand Up @@ -120,6 +126,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
@Nullable
private final Map<String, Object> metadata;
private final Settings settings;
private final CompressedXContent mappings;
private final boolean hidden;
private final boolean replicated;
private final boolean system;
Expand Down Expand Up @@ -156,6 +163,7 @@ public DataStream(
generation,
metadata,
Settings.EMPTY,
EMPTY_MAPPINGS,
hidden,
replicated,
system,
Expand All @@ -176,6 +184,7 @@ public DataStream(
long generation,
Map<String, Object> metadata,
Settings settings,
CompressedXContent mappings,
boolean hidden,
boolean replicated,
boolean system,
Expand All @@ -192,6 +201,7 @@ public DataStream(
generation,
metadata,
settings,
mappings,
hidden,
replicated,
system,
Expand All @@ -210,6 +220,7 @@ public DataStream(
long generation,
Map<String, Object> metadata,
Settings settings,
CompressedXContent mappings,
boolean hidden,
boolean replicated,
boolean system,
Expand All @@ -225,6 +236,7 @@ public DataStream(
this.generation = generation;
this.metadata = metadata;
this.settings = Objects.requireNonNull(settings);
this.mappings = Objects.requireNonNull(mappings);
assert system == false || hidden; // system indices must be hidden
this.hidden = hidden;
this.replicated = replicated;
Expand Down Expand Up @@ -286,11 +298,18 @@ public static DataStream read(StreamInput in) throws IOException {
} else {
settings = Settings.EMPTY;
}
CompressedXContent mappings;
if (in.getTransportVersion().onOrAfter(TransportVersions.MAPPINGS_IN_DATA_STREAMS)) {
mappings = CompressedXContent.readCompressedString(in);
} else {
mappings = EMPTY_MAPPINGS;
}
return new DataStream(
name,
generation,
metadata,
settings,
mappings,
hidden,
replicated,
system,
Expand Down Expand Up @@ -381,8 +400,8 @@ public boolean rolloverOnWrite() {
return backingIndices.rolloverOnWrite;
}

public ComposableIndexTemplate getEffectiveIndexTemplate(ProjectMetadata projectMetadata) {
return getMatchingIndexTemplate(projectMetadata).mergeSettings(settings);
public ComposableIndexTemplate getEffectiveIndexTemplate(ProjectMetadata projectMetadata) throws IOException {
return getMatchingIndexTemplate(projectMetadata).mergeSettings(settings).mergeMappings(mappings);
}

public Settings getEffectiveSettings(ProjectMetadata projectMetadata) {
Expand All @@ -391,6 +410,10 @@ public Settings getEffectiveSettings(ProjectMetadata projectMetadata) {
return templateSettings.merge(settings);
}

public CompressedXContent getEffectiveMappings(ProjectMetadata projectMetadata) throws IOException {
return getMatchingIndexTemplate(projectMetadata).mergeMappings(mappings).template().mappings();
}

private ComposableIndexTemplate getMatchingIndexTemplate(ProjectMetadata projectMetadata) {
return lookupTemplateForDataStream(name, projectMetadata);
}
Expand Down Expand Up @@ -510,6 +533,10 @@ public Settings getSettings() {
return settings;
}

public CompressedXContent getMappings() {
return mappings;
}

@Override
public boolean isHidden() {
return hidden;
Expand Down Expand Up @@ -1354,6 +1381,9 @@ public void writeTo(StreamOutput out) throws IOException {
|| out.getTransportVersion().isPatchFrom(TransportVersions.SETTINGS_IN_DATA_STREAMS_8_19)) {
settings.writeTo(out);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.MAPPINGS_IN_DATA_STREAMS)) {
mappings.writeTo(out);
}
}

public static final ParseField NAME_FIELD = new ParseField("name");
Expand All @@ -1376,6 +1406,7 @@ public void writeTo(StreamOutput out) throws IOException {
public static final ParseField FAILURE_AUTO_SHARDING_FIELD = new ParseField("failure_auto_sharding");
public static final ParseField DATA_STREAM_OPTIONS_FIELD = new ParseField("options");
public static final ParseField SETTINGS_FIELD = new ParseField("settings");
public static final ParseField MAPPINGS_FIELD = new ParseField("mappings");

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>(
Expand All @@ -1385,6 +1416,7 @@ public void writeTo(StreamOutput out) throws IOException {
(Long) args[2],
(Map<String, Object>) args[3],
args[17] == null ? Settings.EMPTY : (Settings) args[17],
args[18] == null ? EMPTY_MAPPINGS : (CompressedXContent) args[18],
args[4] != null && (boolean) args[4],
args[5] != null && (boolean) args[5],
args[6] != null && (boolean) args[6],
Expand Down Expand Up @@ -1456,6 +1488,18 @@ public void writeTo(StreamOutput out) throws IOException {
DATA_STREAM_OPTIONS_FIELD
);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Settings.fromXContent(p), SETTINGS_FIELD);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> {
XContentParser.Token token = p.currentToken();
if (token == XContentParser.Token.VALUE_STRING) {
return new CompressedXContent(Base64.getDecoder().decode(p.text()));
} else if (token == XContentParser.Token.VALUE_EMBEDDED_OBJECT) {
return new CompressedXContent(p.binaryValue());
} else if (token == XContentParser.Token.START_OBJECT) {
return new CompressedXContent(Strings.toString(XContentFactory.jsonBuilder().map(p.mapOrdered())));
} else {
throw new IllegalArgumentException("Unexpected token: " + token);
}
}, MAPPINGS_FIELD, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);
}

public static DataStream fromXContent(XContentParser parser) throws IOException {
Expand Down Expand Up @@ -1520,6 +1564,20 @@ public XContentBuilder toXContent(
builder.startObject(SETTINGS_FIELD.getPreferredName());
this.settings.toXContent(builder, params);
builder.endObject();

String context = params.param(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_API);
boolean binary = params.paramAsBoolean("binary", false);
if (Metadata.CONTEXT_MODE_API.equals(context) || binary == false) {
Map<String, Object> uncompressedMapping = XContentHelper.convertToMap(this.mappings.uncompressed(), true, XContentType.JSON)
.v2();
if (uncompressedMapping.isEmpty() == false) {
builder.field(MAPPINGS_FIELD.getPreferredName());
builder.map(uncompressedMapping);
}
} else {
builder.field(MAPPINGS_FIELD.getPreferredName(), mappings.compressed());
}

builder.endObject();
return builder;
}
Expand Down Expand Up @@ -1864,6 +1922,7 @@ public static class Builder {
@Nullable
private Map<String, Object> metadata = null;
private Settings settings = Settings.EMPTY;
private CompressedXContent mappings = EMPTY_MAPPINGS;
private boolean hidden = false;
private boolean replicated = false;
private boolean system = false;
Expand Down Expand Up @@ -1892,6 +1951,7 @@ private Builder(DataStream dataStream) {
generation = dataStream.generation;
metadata = dataStream.metadata;
settings = dataStream.settings;
mappings = dataStream.mappings;
hidden = dataStream.hidden;
replicated = dataStream.replicated;
system = dataStream.system;
Expand Down Expand Up @@ -1928,6 +1988,11 @@ public Builder setSettings(Settings settings) {
return this;
}

public Builder setMappings(CompressedXContent mappings) {
this.mappings = mappings;
return this;
}

public Builder setHidden(boolean hidden) {
this.hidden = hidden;
return this;
Expand Down Expand Up @@ -1989,6 +2054,7 @@ public DataStream build() {
generation,
metadata,
settings,
mappings,
hidden,
replicated,
system,
Expand Down
Loading