Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto generate index.routing_path from mapping #86790

Merged
merged 11 commits into from
May 20, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.indices.InvalidIndexTemplateException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xcontent.XContentType;
Expand All @@ -28,6 +29,7 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

public class TSDBIndexingIT extends ESSingleNodeTestCase {
Expand Down Expand Up @@ -76,12 +78,15 @@ public void testTimeRanges() throws Exception {
}
}
}""";
Settings templateSettings = Settings.builder().put("index.mode", "time_series").put("index.routing_path", "metricset").build();
var templateSettings = Settings.builder().put("index.mode", "time_series");
if (randomBoolean()) {
templateSettings.put("index.routing_path", "metricset");
}
var request = new PutComposableIndexTemplateAction.Request("id");
request.indexTemplate(
new ComposableIndexTemplate(
List.of("k8s*"),
new Template(templateSettings, new CompressedXContent(mappingTemplate), null),
new Template(templateSettings.build(), new CompressedXContent(mappingTemplate), null),
null,
null,
null,
Expand Down Expand Up @@ -176,34 +181,60 @@ public void testInvalidTsdbTemplatesNoTimeSeriesDimensionAttribute() throws Exce
}
}
}""";
var request = new PutComposableIndexTemplateAction.Request("id");
request.indexTemplate(
new ComposableIndexTemplate(
List.of("k8s*"),
new Template(
Settings.builder().put("index.mode", "time_series").put("index.routing_path", "metricset").build(),
new CompressedXContent(mappingTemplate),
{
var request = new PutComposableIndexTemplateAction.Request("id");
request.indexTemplate(
new ComposableIndexTemplate(
List.of("k8s*"),
new Template(
Settings.builder().put("index.mode", "time_series").put("index.routing_path", "metricset").build(),
new CompressedXContent(mappingTemplate),
null
),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(false, false),
null
),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(false, false),
null
)
);
Exception e = expectThrows(
IllegalArgumentException.class,
() -> client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet()
);
assertThat(
e.getCause().getCause().getMessage(),
equalTo(
"All fields that match routing_path must be keywords with [time_series_dimension: true] and "
+ "without the [script] parameter. [metricset] was not [time_series_dimension: true]."
)
);
)
);
var e = expectThrows(
IllegalArgumentException.class,
() -> client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet()
);
assertThat(
e.getCause().getCause().getMessage(),
equalTo(
"All fields that match routing_path must be keywords with [time_series_dimension: true] and "
+ "without the [script] parameter. [metricset] was not [time_series_dimension: true]."
)
);
}
{
var request = new PutComposableIndexTemplateAction.Request("id");
request.indexTemplate(
new ComposableIndexTemplate(
List.of("k8s*"),
new Template(
Settings.builder().put("index.mode", "time_series").build(),
new CompressedXContent(mappingTemplate),
null
),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(false, false),
null
)
);
var e = expectThrows(
InvalidIndexTemplateException.class,
() -> client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet()
);
assertThat(e.getMessage(), containsString("[index.mode=time_series] requires a non-empty [index.routing_path]"));
}
}

public void testInvalidTsdbTemplatesNoKeywordFieldType() throws Exception {
Expand Down Expand Up @@ -260,54 +291,28 @@ public void testInvalidTsdbTemplatesMissingSettings() throws Exception {
}
}
}""";
{
var request = new PutComposableIndexTemplateAction.Request("id");
request.indexTemplate(
new ComposableIndexTemplate(
List.of("k8s*"),
new Template(
Settings.builder().put("index.mode", "time_series").build(),
new CompressedXContent(mappingTemplate),
null
),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(false, false),
null
)
);
var e = expectThrows(
IllegalArgumentException.class,
() -> client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet()
);
assertThat(e.getMessage(), equalTo("[index.mode=time_series] requires a non-empty [index.routing_path]"));
}
{
var request = new PutComposableIndexTemplateAction.Request("id");
request.indexTemplate(
new ComposableIndexTemplate(
List.of("k8s*"),
new Template(
Settings.builder().put("index.routing_path", "metricset").build(),
new CompressedXContent(mappingTemplate),
null
),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(false, false),
var request = new PutComposableIndexTemplateAction.Request("id");
request.indexTemplate(
new ComposableIndexTemplate(
List.of("k8s*"),
new Template(
Settings.builder().put("index.routing_path", "metricset").build(),
new CompressedXContent(mappingTemplate),
null
)
);
var e = expectThrows(
IllegalArgumentException.class,
() -> client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet()
);
assertThat(e.getMessage(), equalTo("[index.routing_path] requires [index.mode=time_series]"));
}
),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(false, false),
null
)
);
var e = expectThrows(
IllegalArgumentException.class,
() -> client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet()
);
assertThat(e.getCause().getMessage(), equalTo("[index.routing_path] requires [index.mode=time_series]"));
}

static String formatInstant(Instant instant) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -42,8 +42,7 @@ public class TsdbDataStreamRestIT extends ESRestTestCase {
"index": {
"number_of_replicas": 0,
"number_of_shards": 2,
"mode": "time_series",
"routing_path": ["metricset", "time_series_dimension"]
"mode": "time_series"
}
},
"mappings":{
Expand Down Expand Up @@ -328,7 +327,7 @@ public void testSimulateTsdbDataStreamTemplate() throws Exception {
assertThat(ObjectPath.evaluate(responseBody, "template.settings.index.time_series.end_time"), notNullValue());
assertThat(
ObjectPath.evaluate(responseBody, "template.settings.index.routing_path"),
contains("metricset", "time_series_dimension")
containsInAnyOrder("metricset", "k8s.pod.uid")
);
assertThat(ObjectPath.evaluate(responseBody, "overlapping"), empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,52 @@
*/
package org.elasticsearch.datastreams;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettingProvider;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MapperService;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_PATH;

public class DataStreamIndexSettingsProvider implements IndexSettingProvider {

static final DateFormatter FORMATTER = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER;

private final CheckedFunction<IndexMetadata, MapperService, IOException> mapperServiceFactory;

public DataStreamIndexSettingsProvider(CheckedFunction<IndexMetadata, MapperService, IOException> mapperServiceFactory) {
this.mapperServiceFactory = mapperServiceFactory;
}

@Override
public Settings getAdditionalIndexSettings(
String indexName,
String dataStreamName,
boolean timeSeries,
Metadata metadata,
Instant resolvedAt,
Settings allSettings
Settings allSettings,
List<CompressedXContent> combinedTemplateMappings
) {
if (dataStreamName != null) {
DataStream dataStream = metadata.dataStreams().get(dataStreamName);
Expand Down Expand Up @@ -84,6 +103,13 @@ public Settings getAdditionalIndexSettings(
assert start.isBefore(end) : "data stream backing index's start time is not before end time";
builder.put(IndexSettings.TIME_SERIES_START_TIME.getKey(), FORMATTER.format(start));
builder.put(IndexSettings.TIME_SERIES_END_TIME.getKey(), FORMATTER.format(end));

if (allSettings.hasValue(IndexMetadata.INDEX_ROUTING_PATH.getKey()) == false) {
List<String> routingPaths = findRoutingPaths(indexName, allSettings, combinedTemplateMappings);
if (routingPaths != null) {
builder.putList(INDEX_ROUTING_PATH.getKey(), routingPaths);
}
}
return builder.build();
}
}
Expand All @@ -92,4 +118,54 @@ public Settings getAdditionalIndexSettings(
return Settings.EMPTY;
}

// Find fields in mapping that are of type keyword and time_series_dimension enabled.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use javadoc for this. It'll show up when you mouseover the method in your IDE which is kind of nice.

// Using MapperService here has an overhead, but allows the mappings from template to
// be merged correctly and fetching the fields without manually parsing the mappings.
//
// Alternatively this method can instead parse mappings into map of maps and merge that and
// iterate over all values to find the field that can serve as routing value. But this requires
// mapping specific logic to exist here.
private List<String> findRoutingPaths(String indexName, Settings allSettings, List<CompressedXContent> combinedTemplateMappings) {
var tmpIndexMetadata = IndexMetadata.builder(indexName);

int dummyPartitionSize = IndexMetadata.INDEX_ROUTING_PARTITION_SIZE_SETTING.get(allSettings);
int dummyShards = allSettings.getAsInt(
IndexMetadata.SETTING_NUMBER_OF_SHARDS,
dummyPartitionSize == 1 ? 1 : dummyPartitionSize + 1
);
int shardReplicas = allSettings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0);
var finalResolvedSettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(allSettings)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, dummyShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, shardReplicas)
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
// Avoid failing because index.routing_path is missing
.put(IndexSettings.MODE.getKey(), IndexMode.STANDARD)
.build();

tmpIndexMetadata.settings(finalResolvedSettings);
// Create MapperService just to extract keyword dimension fields:
try (var mapperService = mapperServiceFactory.apply(tmpIndexMetadata.build())) {
for (var mapping : combinedTemplateMappings) {
mapperService.merge(MapperService.SINGLE_MAPPING_NAME, mapping, MapperService.MergeReason.INDEX_TEMPLATE);
}

List<String> routingPaths = null;
for (var fieldMapper : mapperService.documentMapper().mappers().fieldMappers()) {
if (fieldMapper instanceof KeywordFieldMapper keywordFieldMapper) {
if (keywordFieldMapper.fieldType().isDimension()) {
if (routingPaths == null) {
routingPaths = new ArrayList<>();
}
routingPaths.add(keywordFieldMapper.name());
}
}
}
return routingPaths;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public List<RestHandler> getRestHandlers(
}

@Override
public Collection<IndexSettingProvider> getAdditionalIndexSettingProviders() {
return List.of(new DataStreamIndexSettingsProvider());
public Collection<IndexSettingProvider> getAdditionalIndexSettingProviders(IndexSettingProvider.Parameters parameters) {
return List.of(new DataStreamIndexSettingsProvider(parameters.mapperServiceFactory));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.datastreams.MetadataDataStreamRolloverServiceTests.createSettingsProvider;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -257,7 +258,7 @@ public void setup() throws Exception {
null,
EmptySystemIndices.INSTANCE,
false,
new IndexSettingProviders(Set.of(new DataStreamIndexSettingsProvider()))
new IndexSettingProviders(Set.of(createSettingsProvider(xContentRegistry())))
);
}
{
Expand Down
Loading