diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java index 7ccbbcdc4f384..01936d21c75bd 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java @@ -18,13 +18,16 @@ */ package org.elasticsearch.action.admin.indices.create; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardsObserver; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.AutoCreateIndex; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; @@ -40,11 +43,15 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.indices.SystemIndexDescriptor; +import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; /** @@ -52,6 +59,8 @@ */ public final class AutoCreateAction extends ActionType { + private static final Logger logger = LogManager.getLogger(AutoCreateAction.class); + public static final AutoCreateAction INSTANCE = new AutoCreateAction(); public static final String NAME = "indices:admin/auto_create"; @@ -65,15 +74,17 @@ public static final class TransportAction extends TransportMasterNodeAction { private final MetadataCreateIndexService createIndexService; + private final SystemIndices systemIndices; @Inject public TransportCreateIndexAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, MetadataCreateIndexService createIndexService, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + SystemIndices systemIndices) { super(CreateIndexAction.NAME, transportService, clusterService, threadPool, actionFilters, CreateIndexRequest::new, indexNameExpressionResolver, CreateIndexResponse::new, ThreadPool.Names.SAME); this.createIndexService = createIndexService; + this.systemIndices = systemIndices; } @Override @@ -58,20 +69,55 @@ protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterSt protected void masterOperation(Task task, final CreateIndexRequest request, final ClusterState state, final ActionListener listener) { String cause = request.cause(); - if (cause.length() == 0) { + if (cause.isEmpty()) { cause = "api"; } final String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index()); - final CreateIndexClusterStateUpdateRequest updateRequest = - new CreateIndexClusterStateUpdateRequest(cause, indexName, request.index()) - .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) - .settings(request.settings()).mappings(request.mappings()) - .aliases(request.aliases()) - .waitForActiveShards(request.waitForActiveShards()); + + final SystemIndexDescriptor descriptor = systemIndices.findMatchingDescriptor(indexName); + final CreateIndexClusterStateUpdateRequest updateRequest = descriptor != null && descriptor.isAutomaticallyManaged() + ? buildSystemIndexUpdateRequest(request, cause, descriptor) + : buildUpdateRequest(request, cause, indexName); createIndexService.createIndex(updateRequest, listener.map(response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName))); } + private CreateIndexClusterStateUpdateRequest buildUpdateRequest(CreateIndexRequest request, String cause, String indexName) { + return new CreateIndexClusterStateUpdateRequest(cause, indexName, request.index()).ackTimeout(request.timeout()) + .masterNodeTimeout(request.masterNodeTimeout()) + .settings(request.settings()) + .mappings(request.mappings()) + .aliases(request.aliases()) + .waitForActiveShards(request.waitForActiveShards()); + } + + private CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateRequest( + CreateIndexRequest request, + String cause, + SystemIndexDescriptor descriptor + ) { + final Settings settings = Objects.requireNonNullElse(descriptor.getSettings(), Settings.EMPTY); + + final Set aliases; + if (descriptor.getAliasName() == null) { + aliases = Set.of(); + } else { + aliases = Set.of(new Alias(descriptor.getAliasName())); + } + + final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest( + cause, + descriptor.getPrimaryIndex(), + request.index() + ); + + return updateRequest.ackTimeout(request.timeout()) + .masterNodeTimeout(request.masterNodeTimeout()) + .aliases(aliases) + .waitForActiveShards(ActiveShardCount.ALL) + .mappings(descriptor.getMappings()) + .settings(settings); + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java index a8b2f3483e6c1..51b1387f37825 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java @@ -36,6 +36,8 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.indices.SystemIndexDescriptor; +import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -55,6 +57,7 @@ public class TransportPutMappingAction extends AcknowledgedTransportMasterNodeAc private final MetadataMappingService metadataMappingService; private final RequestValidators requestValidators; + private final SystemIndices systemIndices; @Inject public TransportPutMappingAction( @@ -64,11 +67,13 @@ public TransportPutMappingAction( final MetadataMappingService metadataMappingService, final ActionFilters actionFilters, final IndexNameExpressionResolver indexNameExpressionResolver, - final RequestValidators requestValidators) { + final RequestValidators requestValidators, + final SystemIndices systemIndices) { super(PutMappingAction.NAME, transportService, clusterService, threadPool, actionFilters, PutMappingRequest::new, indexNameExpressionResolver, ThreadPool.Names.SAME); this.metadataMappingService = metadataMappingService; this.requestValidators = Objects.requireNonNull(requestValidators); + this.systemIndices = systemIndices; } @Override @@ -87,12 +92,25 @@ protected void masterOperation(Task task, final PutMappingRequest request, final final ActionListener listener) { try { final Index[] concreteIndices = resolveIndices(state, request, indexNameExpressionResolver); + final String mappingSource = request.source(); final Optional maybeValidationException = requestValidators.validateRequest(request, state, concreteIndices); if (maybeValidationException.isPresent()) { listener.onFailure(maybeValidationException.get()); return; } + + final List violations = checkForSystemIndexViolations(concreteIndices, mappingSource); + if (violations.isEmpty() == false) { + final String message = "Cannot update mappings in " + + violations + + ": system indices can only use mappings from their descriptors, " + + "but the mappings in the request did not match those in the descriptors(s)"; + logger.warn(message); + listener.onFailure(new IllegalArgumentException(message)); + return; + } + performMappingUpdate(concreteIndices, request, listener, metadataMappingService); } catch (IndexNotFoundException ex) { logger.debug(() -> new ParameterizedMessage("failed to put mappings on indices [{}]", @@ -142,4 +160,21 @@ public void onFailure(Exception t) { }); } + private List checkForSystemIndexViolations(Index[] concreteIndices, String requestMappings) { + List violations = new ArrayList<>(); + + for (Index index : concreteIndices) { + final SystemIndexDescriptor descriptor = systemIndices.findMatchingDescriptor(index.getName()); + if (descriptor != null && descriptor.isAutomaticallyManaged()) { + final String descriptorMappings = descriptor.getMappings(); + + // Technically we could trip over a difference in whitespace here, but then again nobody should be trying to manually + // update a descriptor's mappings. + if (descriptorMappings.equals(requestMappings) == false) { + violations.add(index.getName()); + } + } + } + return violations; + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java index 5dfa44ebe0907..3f88689ea8d3c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java @@ -34,24 +34,36 @@ import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; +import org.elasticsearch.indices.SystemIndexDescriptor; +import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + public class TransportUpdateSettingsAction extends AcknowledgedTransportMasterNodeAction { private static final Logger logger = LogManager.getLogger(TransportUpdateSettingsAction.class); private final MetadataUpdateSettingsService updateSettingsService; + private final SystemIndices systemIndices; @Inject public TransportUpdateSettingsAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, MetadataUpdateSettingsService updateSettingsService, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + SystemIndices systemIndices) { super(UpdateSettingsAction.NAME, transportService, clusterService, threadPool, actionFilters, UpdateSettingsRequest::new, indexNameExpressionResolver, ThreadPool.Names.SAME); this.updateSettingsService = updateSettingsService; + this.systemIndices = systemIndices; } @Override @@ -75,9 +87,24 @@ protected ClusterBlockException checkBlock(UpdateSettingsRequest request, Cluste protected void masterOperation(Task task, final UpdateSettingsRequest request, final ClusterState state, final ActionListener listener) { final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request); + final Settings requestSettings = request.settings(); + + + final Map> systemIndexViolations = checkForSystemIndexViolations(concreteIndices, requestSettings); + if (systemIndexViolations.isEmpty() == false) { + final String message = "Cannot override settings on system indices: " + + systemIndexViolations.entrySet() + .stream() + .map(entry -> "[" + entry.getKey() + "] -> " + entry.getValue()) + .collect(Collectors.joining(", ")); + logger.warn(message); + listener.onFailure(new IllegalArgumentException(message)); + return; + } + UpdateSettingsClusterStateUpdateRequest clusterStateUpdateRequest = new UpdateSettingsClusterStateUpdateRequest() .indices(concreteIndices) - .settings(request.settings()) + .settings(requestSettings) .setPreserveExisting(request.isPreserveExisting()) .ackTimeout(request.timeout()) .masterNodeTimeout(request.masterNodeTimeout()); @@ -95,4 +122,37 @@ public void onFailure(Exception t) { } }); } + + /** + * Checks that if the request is trying to apply settings changes to any system indices, then the settings' values match those + * that the system index's descriptor expects. + * + * @param concreteIndices the indices being updated + * @param requestSettings the settings to be applied + * @return a mapping from system index pattern to the settings whose values would be overridden. Empty if there are no violations. + */ + private Map> checkForSystemIndexViolations(Index[] concreteIndices, Settings requestSettings) { + final Map> violations = new HashMap<>(); + + for (Index index : concreteIndices) { + final SystemIndexDescriptor descriptor = systemIndices.findMatchingDescriptor(index.getName()); + if (descriptor != null && descriptor.isAutomaticallyManaged()) { + final Settings descriptorSettings = descriptor.getSettings(); + List failedKeys = new ArrayList<>(); + for (String key : requestSettings.keySet()) { + final String expectedValue = descriptorSettings.get(key); + final String actualValue = requestSettings.get(key); + + if (expectedValue.equals(actualValue) == false) { + failedKeys.add(key); + } + } + + if (failedKeys.isEmpty() == false) { + violations.put(descriptor.getIndexPattern(), failedKeys); + } + } + } + return violations; + } } diff --git a/server/src/main/java/org/elasticsearch/common/Strings.java b/server/src/main/java/org/elasticsearch/common/Strings.java index a303b49291e45..6c96488bcaa47 100644 --- a/server/src/main/java/org/elasticsearch/common/Strings.java +++ b/server/src/main/java/org/elasticsearch/common/Strings.java @@ -874,6 +874,21 @@ public static String cleanTruncate(String s, int length) { return s.substring(0, length); } + /** + * Checks that the supplied string is neither null nor empty, per {@link #isNullOrEmpty(String)}. + * If this check fails, then an {@link IllegalArgumentException} is thrown with the supplied message. + * + * @param str the String to check + * @param message the exception message to use if {@code str} is null or empty + * @return the supplied {@code str} + */ + public static String requireNonEmpty(String str, String message) { + if (isNullOrEmpty(str)) { + throw new IllegalArgumentException(message); + } + return str; + } + public static boolean isNullOrEmpty(@Nullable String s) { return s == null || s.isEmpty(); } diff --git a/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java b/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java index 62f37c633b738..f7f97a21f6f8f 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java @@ -19,41 +19,152 @@ package org.elasticsearch.indices; +import org.apache.lucene.util.automaton.Automaton; import org.apache.lucene.util.automaton.CharacterRunAutomaton; -import org.elasticsearch.common.regex.Regex; +import org.apache.lucene.util.automaton.Operations; +import org.apache.lucene.util.automaton.RegExp; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import java.util.Objects; /** - * Describes a system index. Provides the information required to create and maintain the system index. + * A system index descriptor describes one or more system indices. It can match a number of indices using + * a pattern. For system indices that are managed externally to Elasticsearch, this is enough. For system + * indices that are managed internally to Elasticsearch, a descriptor can also include information for + * creating the system index, upgrading its mappings, and creating an alias. */ public class SystemIndexDescriptor { + /** A pattern, either with a wildcard or simple regex. Indices that match one of these patterns are considered system indices. */ private final String indexPattern; + + /** + * For internally-managed indices, specifies the name of the concrete index to create and update. This is required + * since the {@link #indexPattern} can match many indices. + */ + private final String primaryIndex; + + /** A description of the index or indices */ private final String description; + + /** Used to determine whether an index name matches the {@link #indexPattern} */ private final CharacterRunAutomaton indexPatternAutomaton; + /** For internally-managed indices, contains the index mappings JSON */ + private final String mappings; + + /** For internally-managed indices, contains the index settings */ + private final Settings settings; + + /** For internally-managed indices, an optional alias to create */ + private final String aliasName; + + /** For internally-managed indices, an optional {@link IndexMetadata#INDEX_FORMAT_SETTING} value to expect */ + private final int indexFormat; + /** - * + * For internally-managed indices, specifies a key name under _meta in the index mappings + * that contains the index's mappings' version. + */ + private final String versionMetaKey; + + /** For internally-managed indices, specifies the origin to use when creating or updating the index */ + private final String origin; + + /** + * Creates a descriptor for system indices matching the supplied pattern. These indices will not be managed + * by Elasticsearch internally. * @param indexPattern The pattern of index names that this descriptor will be used for. Must start with a '.' character. * @param description The name of the plugin responsible for this system index. */ public SystemIndexDescriptor(String indexPattern, String description) { + this(indexPattern, null, description, null, null, null, 0, null, null); + } + + /** + * Creates a descriptor for system indices matching the supplied pattern. These indices will be managed + * by Elasticsearch internally if mappings or settings are provided. + * + * @param indexPattern The pattern of index names that this descriptor will be used for. Must start with a '.' character. + * @param description The name of the plugin responsible for this system index. + * @param mappings The mappings to apply to this index when auto-creating, if appropriate + * @param settings The settings to apply to this index when auto-creating, if appropriate + * @param aliasName An alias for the index, or null + * @param indexFormat A value for the `index.format` setting. Pass 0 or higher. + * @param versionMetaKey a mapping key under _meta where a version can be found, which indicates the + * Elasticsearch version when the index was created. + * @param origin the client origin to use when creating this index. + */ + private SystemIndexDescriptor( + String indexPattern, + String primaryIndex, + String description, + String mappings, + Settings settings, + String aliasName, + int indexFormat, + String versionMetaKey, + String origin + ) { Objects.requireNonNull(indexPattern, "system index pattern must not be null"); if (indexPattern.length() < 2) { - throw new IllegalArgumentException("system index pattern provided as [" + indexPattern + - "] but must at least 2 characters in length"); + throw new IllegalArgumentException( + "system index pattern provided as [" + indexPattern + "] but must at least 2 characters in length" + ); } if (indexPattern.charAt(0) != '.') { - throw new IllegalArgumentException("system index pattern provided as [" + indexPattern + - "] but must start with the character [.]"); + throw new IllegalArgumentException( + "system index pattern provided as [" + indexPattern + "] but must start with the character [.]" + ); } if (indexPattern.charAt(1) == '*') { - throw new IllegalArgumentException("system index pattern provided as [" + indexPattern + - "] but must not start with the character sequence [.*] to prevent conflicts"); + throw new IllegalArgumentException( + "system index pattern provided as [" + + indexPattern + + "] but must not start with the character sequence [.*] to prevent conflicts" + ); + } + + if (primaryIndex != null) { + if (primaryIndex.charAt(0) != '.') { + throw new IllegalArgumentException( + "system primary index provided as [" + primaryIndex + "] but must start with the character [.]" + ); + } + if (primaryIndex.matches("^\\.[\\w-]+$") == false) { + throw new IllegalArgumentException( + "system primary index provided as [" + primaryIndex + "] but cannot contain special characters or patterns" + ); + } + } + + if (indexFormat < 0) { + throw new IllegalArgumentException("Index format cannot be negative"); + } + + Strings.requireNonEmpty(indexPattern, "indexPattern must be supplied"); + + if (mappings != null || settings != null) { + Strings.requireNonEmpty(primaryIndex, "Must supply primaryIndex if mappings or settings are defined"); + Strings.requireNonEmpty(versionMetaKey, "Must supply versionMetaKey if mappings or settings are defined"); + Strings.requireNonEmpty(origin, "Must supply origin if mappings or settings are defined"); } + this.indexPattern = indexPattern; - this.indexPatternAutomaton = new CharacterRunAutomaton(Regex.simpleMatchToAutomaton(indexPattern)); + this.primaryIndex = primaryIndex; + + final Automaton automaton = buildAutomaton(indexPattern, aliasName); + this.indexPatternAutomaton = new CharacterRunAutomaton(automaton); + this.description = description; + this.mappings = mappings; + this.settings = settings; + this.aliasName = aliasName; + this.indexFormat = indexFormat; + this.versionMetaKey = versionMetaKey; + this.origin = origin; } /** @@ -63,6 +174,14 @@ public String getIndexPattern() { return indexPattern; } + /** + * @return The concrete name of an index being managed internally to Elasticsearch. Will be {@code null} + * for indices managed externally to Elasticsearch. + */ + public String getPrimaryIndex() { + return primaryIndex; + } + /** * Checks whether an index name matches the system index name pattern for this descriptor. * @param index The index name to be checked against the index pattern given at construction time. @@ -81,10 +200,151 @@ public String getDescription() { @Override public String toString() { - return "SystemIndexDescriptor[pattern=[" + indexPattern + "], description=[" + description + "]]"; + return "SystemIndexDescriptor[pattern=[" + indexPattern + "], description=[" + description + "], aliasName=[" + aliasName + "]]"; + } + + public String getMappings() { + return mappings; + } + + public Settings getSettings() { + return settings; + } + + public String getAliasName() { + return aliasName; + } + + public int getIndexFormat() { + return this.indexFormat; + } + + public String getVersionMetaKey() { + return this.versionMetaKey; + } + + public boolean isAutomaticallyManaged() { + return this.mappings != null || this.settings != null; + } + + public String getOrigin() { + return this.origin; } - // TODO: Index settings and mapping // TODO: getThreadpool() // TODO: Upgrade handling (reindex script?) + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private String indexPattern; + private String primaryIndex; + private String description; + private XContentBuilder mappingsBuilder = null; + private Settings settings = null; + private String aliasName = null; + private int indexFormat = 0; + private String versionMetaKey = null; + private String origin = null; + + private Builder() {} + + public Builder setIndexPattern(String indexPattern) { + this.indexPattern = indexPattern; + return this; + } + + public Builder setPrimaryIndex(String primaryIndex) { + this.primaryIndex = primaryIndex; + return this; + } + + public Builder setDescription(String description) { + this.description = description; + return this; + } + + public Builder setMappings(XContentBuilder mappingsBuilder) { + this.mappingsBuilder = mappingsBuilder; + return this; + } + + public Builder setSettings(Settings settings) { + this.settings = settings; + return this; + } + + public Builder setAliasName(String aliasName) { + this.aliasName = aliasName; + return this; + } + + public Builder setIndexFormat(int indexFormat) { + this.indexFormat = indexFormat; + return this; + } + + public Builder setVersionMetaKey(String versionMetaKey) { + this.versionMetaKey = versionMetaKey; + return this; + } + + public Builder setOrigin(String origin) { + this.origin = origin; + return this; + } + + public SystemIndexDescriptor build() { + String mappings = mappingsBuilder == null ? null : Strings.toString(mappingsBuilder); + + return new SystemIndexDescriptor( + indexPattern, + primaryIndex, + description, + mappings, + settings, + aliasName, + indexFormat, + versionMetaKey, + origin + ); + } + } + + /** + * Builds an automaton for matching index names against this descriptor's index pattern. + * If this descriptor has an alias name, the automaton will also try to match against + * the alias as well. + */ + static Automaton buildAutomaton(String pattern, String alias) { + final String patternAsRegex = patternToRegex(pattern); + final String aliasAsRegex = alias == null ? null : patternToRegex(alias); + + final Automaton patternAutomaton = new RegExp(patternAsRegex).toAutomaton(); + + if (aliasAsRegex == null) { + return patternAutomaton; + } + + final Automaton aliasAutomaton = new RegExp(aliasAsRegex).toAutomaton(); + + return Operations.union(patternAutomaton, aliasAutomaton); + } + + /** + * Translate a simple string pattern into a regular expression, suitable for creating a + * {@link RegExp} instance. This exists because although + * {@link org.elasticsearch.common.regex.Regex#simpleMatchToAutomaton(String)} is useful + * for simple patterns, it doesn't support character ranges. + * @param input the string to translate + * @return the translate string + */ + private static String patternToRegex(String input) { + String output = input; + output = output.replaceAll("\\.", "\\."); + output = output.replaceAll("\\*", ".*"); + return output; + } } diff --git a/server/src/main/java/org/elasticsearch/indices/SystemIndexManager.java b/server/src/main/java/org/elasticsearch/indices/SystemIndexManager.java new file mode 100644 index 0000000000000..ce427d61a4f07 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/SystemIndexManager.java @@ -0,0 +1,280 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.support.GroupedActionListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.health.ClusterIndexHealth; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.gateway.GatewayService; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_FORMAT_SETTING; + +/** + * This class ensures that all system indices have up-to-date mappings, provided + * those indices can be automatically managed. Only some system indices are managed + * internally to Elasticsearch - others are created and managed externally, e.g. + * Kibana indices. + */ +public class SystemIndexManager implements ClusterStateListener { + private static final Logger logger = LogManager.getLogger(SystemIndexManager.class); + + private final SystemIndices systemIndices; + private final Client client; + private final AtomicBoolean isUpgradeInProgress; + + public SystemIndexManager(SystemIndices systemIndices, Client client) { + this.systemIndices = systemIndices; + this.client = client; + this.isUpgradeInProgress = new AtomicBoolean(false); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + final ClusterState state = event.state(); + if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + // wait until the gateway has recovered from disk, otherwise we may think we don't have some + // indices but they may not have been restored from the cluster state on disk + logger.debug("system indices manager waiting until state has been recovered"); + return; + } + + // If this node is not a master node, exit. + if (state.nodes().isLocalNodeElectedMaster() == false) { + return; + } + + if (isUpgradeInProgress.compareAndSet(false, true)) { + final List descriptors = getEligibleDescriptors(state.getMetadata()).stream() + .filter(descriptor -> getUpgradeStatus(state, descriptor) == UpgradeStatus.NEEDS_MAPPINGS_UPDATE) + .collect(Collectors.toList()); + + if (descriptors.isEmpty() == false) { + // Use a GroupedActionListener so that we only release the lock once all upgrade attempts have succeeded or failed. + // The failures are logged in upgradeIndexMetadata(), so we don't actually care about them here. + ActionListener listener = new GroupedActionListener<>( + ActionListener.wrap(() -> isUpgradeInProgress.set(false)), + descriptors.size() + ); + + descriptors.forEach(descriptor -> upgradeIndexMetadata(descriptor, listener)); + } else { + isUpgradeInProgress.set(false); + } + } + } + + /** + * Checks all known system index descriptors, looking for those that correspond to + * indices that can be automatically managed and that have already been created. + * @param metadata the cluster state metadata to consult + * @return a list of descriptors that could potentially be updated + */ + List getEligibleDescriptors(Metadata metadata) { + return this.systemIndices.getSystemIndexDescriptors() + .stream() + .filter(SystemIndexDescriptor::isAutomaticallyManaged) + .filter(d -> metadata.hasConcreteIndex(d.getPrimaryIndex())) + .collect(Collectors.toList()); + } + + enum UpgradeStatus { + CLOSED, + UNHEALTHY, + NEEDS_UPGRADE, + UP_TO_DATE, + NEEDS_MAPPINGS_UPDATE + } + + /** + * Determines an index's current state, with respect to whether its mappings can + * be updated. + * + * @param clusterState the cluster state to use when calculating the upgrade state + * @param descriptor information about the system index to check + * @return a value that indicates the index's state. + */ + UpgradeStatus getUpgradeStatus(ClusterState clusterState, SystemIndexDescriptor descriptor) { + final State indexState = calculateIndexState(clusterState, descriptor); + + final String indexDescription = "Index [" + descriptor.getPrimaryIndex() + "] (alias [" + descriptor.getAliasName() + "])"; + + // The messages below will be logged on every cluster state update, which is why even in the index closed / red + // cases, the log levels are DEBUG. + + if (indexState.indexState == IndexMetadata.State.CLOSE) { + logger.debug("Index {} is closed. This is likely to prevent some features from functioning correctly", indexDescription); + return UpgradeStatus.CLOSED; + } + + if (indexState.indexHealth == ClusterHealthStatus.RED) { + logger.debug("Index {} health status is RED, any pending mapping upgrades will wait until this changes", indexDescription); + return UpgradeStatus.UNHEALTHY; + } + + if (indexState.isIndexUpToDate == false) { + logger.debug( + "Index {} is not on the current version. Features relying " + + "on the index will not be available until the index is upgraded", + indexDescription + ); + return UpgradeStatus.NEEDS_UPGRADE; + } else if (indexState.mappingUpToDate) { + logger.trace("Index {} is up-to-date, no action required", indexDescription); + return UpgradeStatus.UP_TO_DATE; + } else { + logger.info("Index {} mappings are not up-to-date and will be updated", indexDescription); + return UpgradeStatus.NEEDS_MAPPINGS_UPDATE; + } + } + + /** + * Updates the mappings for a system index + * @param descriptor information about the system index + * @param listener a listener to call upon success or failure + */ + private void upgradeIndexMetadata(SystemIndexDescriptor descriptor, ActionListener listener) { + final String indexName = descriptor.getPrimaryIndex(); + + PutMappingRequest request = new PutMappingRequest(indexName).source(descriptor.getMappings(), XContentType.JSON); + + final OriginSettingClient originSettingClient = new OriginSettingClient(this.client, descriptor.getOrigin()); + + originSettingClient.admin().indices().putMapping(request, new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse response) { + if (response.isAcknowledged() == false) { + String message = "Put mapping request for [" + indexName + "] was not acknowledged"; + logger.error(message); + listener.onFailure(new ElasticsearchException(message)); + } else { + listener.onResponse(response); + } + } + + @Override + public void onFailure(Exception e) { + logger.error("Put mapping request for [" + indexName + "] failed", e); + listener.onFailure(e); + } + }); + } + + /** + * Derives a summary of the current state of a system index, relative to the given cluster state. + */ + State calculateIndexState(ClusterState state, SystemIndexDescriptor descriptor) { + final IndexMetadata indexMetadata = state.metadata().index(descriptor.getPrimaryIndex()); + assert indexMetadata != null; + + final boolean isIndexUpToDate = INDEX_FORMAT_SETTING.get(indexMetadata.getSettings()) == descriptor.getIndexFormat(); + + final boolean isMappingIsUpToDate = checkIndexMappingUpToDate(descriptor, indexMetadata); + final String concreteIndexName = indexMetadata.getIndex().getName(); + + final ClusterHealthStatus indexHealth; + final IndexMetadata.State indexState = indexMetadata.getState(); + + if (indexState == IndexMetadata.State.CLOSE) { + indexHealth = null; + logger.warn( + "Index [{}] (alias [{}]) is closed. This is likely to prevent some features from functioning correctly", + concreteIndexName, + descriptor.getAliasName() + ); + } else { + final IndexRoutingTable routingTable = state.getRoutingTable().index(indexMetadata.getIndex()); + indexHealth = new ClusterIndexHealth(indexMetadata, routingTable).getStatus(); + } + + return new State(indexState, indexHealth, isIndexUpToDate, isMappingIsUpToDate); + } + + /** Checks whether an index's mappings are up-to-date */ + private boolean checkIndexMappingUpToDate(SystemIndexDescriptor descriptor, IndexMetadata indexMetadata) { + final MappingMetadata mappingMetadata = indexMetadata.mapping(); + if (mappingMetadata == null) { + return false; + } + + return Version.CURRENT.equals(readMappingVersion(descriptor, mappingMetadata)); + } + + /** + * Fetches the mapping version from an index's mapping's `_meta` info. + */ + @SuppressWarnings("unchecked") + private Version readMappingVersion(SystemIndexDescriptor descriptor, MappingMetadata mappingMetadata) { + final String indexName = descriptor.getPrimaryIndex(); + try { + Map meta = (Map) mappingMetadata.sourceAsMap().get("_meta"); + if (meta == null) { + logger.warn("Missing _meta field in mapping [{}] of index [{}]", mappingMetadata.type(), indexName); + throw new IllegalStateException("Cannot read version string in index " + indexName); + } + + final String versionString = (String) meta.get(descriptor.getVersionMetaKey()); + if (versionString == null) { + logger.warn("No value found in mappings for [_meta.{}]", descriptor.getVersionMetaKey()); + } + return Version.fromString(versionString); + } catch (ElasticsearchParseException e) { + logger.error(new ParameterizedMessage("Cannot parse the mapping for index [{}]", indexName), e); + throw new ElasticsearchException("Cannot parse the mapping for index [{}]", e, indexName); + } + } + + static class State { + final IndexMetadata.State indexState; + final ClusterHealthStatus indexHealth; + final boolean isIndexUpToDate; + final boolean mappingUpToDate; + + State(IndexMetadata.State indexState, ClusterHealthStatus indexHealth, boolean isIndexUpToDate, boolean mappingUpToDate) { + this.indexState = indexState; + this.indexHealth = indexHealth; + this.isIndexUpToDate = isIndexUpToDate; + this.mappingUpToDate = mappingUpToDate; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/indices/SystemIndices.java b/server/src/main/java/org/elasticsearch/indices/SystemIndices.java index ae4a64111a588..91067c458f040 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemIndices.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemIndices.java @@ -26,7 +26,6 @@ import org.apache.lucene.util.automaton.Operations; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.regex.Regex; import org.elasticsearch.index.Index; import org.elasticsearch.tasks.TaskResultsService; @@ -57,13 +56,33 @@ public class SystemIndices { public SystemIndices(Map> pluginAndModulesDescriptors) { final Map> descriptorsMap = buildSystemIndexDescriptorMap(pluginAndModulesDescriptors); checkForOverlappingPatterns(descriptorsMap); - this.systemIndexDescriptors = descriptorsMap.values() - .stream() - .flatMap(Collection::stream) - .collect(Collectors.toUnmodifiableList()); + this.systemIndexDescriptors = descriptorsMap.values().stream().flatMap(Collection::stream).collect(Collectors.toUnmodifiableList()); + checkForDuplicateAliases(this.systemIndexDescriptors); this.runAutomaton = buildCharacterRunAutomaton(systemIndexDescriptors); } + private void checkForDuplicateAliases(Collection descriptors) { + final Map aliasCounts = new HashMap<>(); + + for (SystemIndexDescriptor descriptor : descriptors) { + final String aliasName = descriptor.getAliasName(); + if (aliasName != null) { + aliasCounts.compute(aliasName, (alias, existingCount) -> 1 + (existingCount == null ? 0 : existingCount)); + } + } + + final List duplicateAliases = aliasCounts.entrySet() + .stream() + .filter(entry -> entry.getValue() > 1) + .map(Map.Entry::getKey) + .sorted() + .collect(Collectors.toList()); + + if (duplicateAliases.isEmpty() == false) { + throw new IllegalStateException("Found aliases associated with multiple system index descriptors: " + duplicateAliases + ""); + } + } + /** * Determines whether a given index is a system index by comparing its name to the collection of loaded {@link SystemIndexDescriptor}s * @param index the {@link Index} object to check against loaded {@link SystemIndexDescriptor}s @@ -114,7 +133,7 @@ public boolean isSystemIndex(String indexName) { private static CharacterRunAutomaton buildCharacterRunAutomaton(Collection descriptors) { Optional automaton = descriptors.stream() - .map(descriptor -> Regex.simpleMatchToAutomaton(descriptor.getIndexPattern())) + .map(descriptor -> SystemIndexDescriptor.buildAutomaton(descriptor.getIndexPattern(), descriptor.getAliasName())) .reduce(Operations::union); return new CharacterRunAutomaton(MinimizationOperations.minimize(automaton.orElse(Automata.makeEmpty()), Integer.MAX_VALUE)); } @@ -152,8 +171,8 @@ static void checkForOverlappingPatterns(Map> buildSystemIndexDe }); return Map.copyOf(map); } + + Collection getSystemIndexDescriptors() { + return this.systemIndexDescriptors; + } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index cbcd2ad7d57e1..ee8f0bcb1b92e 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -110,6 +110,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.ShardLimitValidator; import org.elasticsearch.indices.SystemIndexDescriptor; +import org.elasticsearch.indices.SystemIndexManager; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.indices.analysis.AnalysisModule; import org.elasticsearch.indices.breaker.BreakerSettings; @@ -498,6 +499,9 @@ protected Node(final Environment initialEnvironment, plugin -> plugin.getSystemIndexDescriptors(settings))); final SystemIndices systemIndices = new SystemIndices(systemIndexDescriptorMap); + final SystemIndexManager systemIndexManager = new SystemIndexManager(systemIndices, client); + clusterService.addListener(systemIndexManager); + final RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute); rerouteServiceReference.set(rerouteService); diff --git a/server/src/test/java/org/elasticsearch/indices/SystemIndexDescriptorTests.java b/server/src/test/java/org/elasticsearch/indices/SystemIndexDescriptorTests.java index 35d7a928be384..950494069472a 100644 --- a/server/src/test/java/org/elasticsearch/indices/SystemIndexDescriptorTests.java +++ b/server/src/test/java/org/elasticsearch/indices/SystemIndexDescriptorTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.test.ESTestCase; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; public class SystemIndexDescriptorTests extends ESTestCase { @@ -56,9 +57,33 @@ public void testValidation() { assertThat(ex.getMessage(), containsString("must not start with the character sequence [.*] to prevent conflicts")); } { - Exception ex = expectThrows(IllegalArgumentException.class, - () -> new SystemIndexDescriptor(".*" + randomAlphaOfLength(10), randomAlphaOfLength(5))); + Exception ex = expectThrows( + IllegalArgumentException.class, + () -> new SystemIndexDescriptor(".*" + randomAlphaOfLength(10), randomAlphaOfLength(5)) + ); assertThat(ex.getMessage(), containsString("must not start with the character sequence [.*] to prevent conflicts")); } + { + final String primaryIndex = randomAlphaOfLength(5); + Exception ex = expectThrows( + IllegalArgumentException.class, + () -> SystemIndexDescriptor.builder().setIndexPattern("." + primaryIndex).setPrimaryIndex(primaryIndex).build() + ); + assertThat( + ex.getMessage(), + equalTo("system primary index provided as [" + primaryIndex + "] but must start with the character [.]") + ); + } + { + final String primaryIndex = "." + randomAlphaOfLength(5) + "*"; + Exception ex = expectThrows( + IllegalArgumentException.class, + () -> SystemIndexDescriptor.builder().setIndexPattern("." + randomAlphaOfLength(5)).setPrimaryIndex(primaryIndex).build() + ); + assertThat( + ex.getMessage(), + equalTo("system primary index provided as [" + primaryIndex + "] but cannot contain special characters or patterns") + ); + } } } diff --git a/server/src/test/java/org/elasticsearch/indices/SystemIndexManagerTests.java b/server/src/test/java/org/elasticsearch/indices/SystemIndexManagerTests.java new file mode 100644 index 0000000000000..9832033abaccf --- /dev/null +++ b/server/src/test/java/org/elasticsearch/indices/SystemIndexManagerTests.java @@ -0,0 +1,391 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices; + +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetadata; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.SystemIndexManager.UpgradeStatus; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.Before; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class SystemIndexManagerTests extends ESTestCase { + + private static final ClusterName CLUSTER_NAME = new ClusterName("security-index-manager-tests"); + private static final ClusterState EMPTY_CLUSTER_STATE = new ClusterState.Builder(CLUSTER_NAME).build(); + + private static final String SYSTEM_INDEX_NAME = ".myindex-1"; + + private static final SystemIndexDescriptor DESCRIPTOR = SystemIndexDescriptor.builder() + .setIndexPattern(".myindex-*") + .setPrimaryIndex(SYSTEM_INDEX_NAME) + .setAliasName(".myindex") + .setIndexFormat(6) + .setSettings(getSettings()) + .setMappings(getMappings()) + .setVersionMetaKey("version") + .setOrigin("FAKE_ORIGIN") + .build(); + + private Client client; + + @Before + public void setUpManager() { + client = mock(Client.class); + final ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + when(threadPool.generic()).thenReturn(EsExecutors.newDirectExecutorService()); + when(client.threadPool()).thenReturn(threadPool); + when(client.settings()).thenReturn(Settings.EMPTY); + } + + /** + * Check that the manager skips over descriptors whose indices cannot be managed. + */ + public void testManagerSkipsDescriptorsThatAreNotManaged() { + SystemIndexDescriptor d1 = new SystemIndexDescriptor(".foo-1", ""); + SystemIndexDescriptor d2 = SystemIndexDescriptor.builder() + .setIndexPattern(".bar-*") + .setPrimaryIndex(".bar-1") + .setMappings(getMappings()) + .setSettings(getSettings()) + .setVersionMetaKey("version") + .setOrigin("FAKE_ORIGIN") + .build(); + + SystemIndices systemIndices = new SystemIndices(Map.of("index 1", List.of(d1), "index 2", List.of(d2))); + SystemIndexManager manager = new SystemIndexManager(systemIndices, client); + + final List eligibleDescriptors = manager.getEligibleDescriptors( + Metadata.builder() + .put(getIndexMetadata(d1, null, 6, IndexMetadata.State.OPEN)) + .put(getIndexMetadata(d2, d2.getMappings(), 6, IndexMetadata.State.OPEN)) + .build() + ); + + assertThat(eligibleDescriptors, hasSize(1)); + assertThat(eligibleDescriptors, contains(d2)); + } + + /** + * Check that the manager skips over indices that don't exist yet, since system indices are + * created on-demand. + */ + public void testManagerSkipsDescriptorsForIndicesThatDoNotExist() { + SystemIndexDescriptor d1 = SystemIndexDescriptor.builder() + .setIndexPattern(".foo-*") + .setPrimaryIndex(".foo-1") + .setMappings(getMappings()) + .setSettings(getSettings()) + .setVersionMetaKey("version") + .setOrigin("FAKE_ORIGIN") + .build(); + SystemIndexDescriptor d2 = SystemIndexDescriptor.builder() + .setIndexPattern(".bar-*") + .setPrimaryIndex(".bar-1") + .setMappings(getMappings()) + .setSettings(getSettings()) + .setVersionMetaKey("version") + .setOrigin("FAKE_ORIGIN") + .build(); + + SystemIndices systemIndices = new SystemIndices(Map.of("index 1", List.of(d1), "index 2", List.of(d2))); + SystemIndexManager manager = new SystemIndexManager(systemIndices, client); + + final List eligibleDescriptors = manager.getEligibleDescriptors( + Metadata.builder().put(getIndexMetadata(d2, d2.getMappings(), 6, IndexMetadata.State.OPEN)).build() + ); + + assertThat(eligibleDescriptors, hasSize(1)); + assertThat(eligibleDescriptors, contains(d2)); + } + + /** + * Check that the manager won't try to upgrade closed indices. + */ + public void testManagerSkipsClosedIndices() { + SystemIndices systemIndices = new SystemIndices(Map.of("MyIndex", List.of(DESCRIPTOR))); + SystemIndexManager manager = new SystemIndexManager(systemIndices, client); + + final ClusterState.Builder clusterStateBuilder = createClusterState(IndexMetadata.State.CLOSE); + + assertThat(manager.getUpgradeStatus(clusterStateBuilder.build(), DESCRIPTOR), equalTo(UpgradeStatus.CLOSED)); + } + + /** + * Check that the manager won't try to upgrade unhealthy indices. + */ + public void testManagerSkipsIndicesWithRedStatus() { + SystemIndices systemIndices = new SystemIndices(Map.of("MyIndex", List.of(DESCRIPTOR))); + SystemIndexManager manager = new SystemIndexManager(systemIndices, client); + + final ClusterState.Builder clusterStateBuilder = createClusterState(); + markShardsUnavailable(clusterStateBuilder); + + assertThat(manager.getUpgradeStatus(clusterStateBuilder.build(), DESCRIPTOR), equalTo(UpgradeStatus.UNHEALTHY)); + } + + /** + * Check that the manager won't try to upgrade indices where the `index.format` setting + * is earlier than an expected value. + */ + public void testManagerSkipsIndicesWithOutdatedFormat() { + SystemIndices systemIndices = new SystemIndices(Map.of("MyIndex", List.of(DESCRIPTOR))); + SystemIndexManager manager = new SystemIndexManager(systemIndices, client); + + final ClusterState.Builder clusterStateBuilder = createClusterState(5); + markShardsAvailable(clusterStateBuilder); + + assertThat(manager.getUpgradeStatus(clusterStateBuilder.build(), DESCRIPTOR), equalTo(UpgradeStatus.NEEDS_UPGRADE)); + } + + /** + * Check that the manager won't try to upgrade indices where their mappings are already up-to-date. + */ + public void testManagerSkipsIndicesWithUpToDateMappings() { + SystemIndices systemIndices = new SystemIndices(Map.of("MyIndex", List.of(DESCRIPTOR))); + SystemIndexManager manager = new SystemIndexManager(systemIndices, client); + + final ClusterState.Builder clusterStateBuilder = createClusterState(); + markShardsAvailable(clusterStateBuilder); + + assertThat(manager.getUpgradeStatus(clusterStateBuilder.build(), DESCRIPTOR), equalTo(UpgradeStatus.UP_TO_DATE)); + } + + /** + * Check that the manager will try to upgrade indices where their mappings are out-of-date. + */ + public void testManagerProcessesIndicesWithOutdatedMappings() { + SystemIndices systemIndices = new SystemIndices(Map.of("MyIndex", List.of(DESCRIPTOR))); + SystemIndexManager manager = new SystemIndexManager(systemIndices, client); + + final ClusterState.Builder clusterStateBuilder = createClusterState(Strings.toString(getMappings("1.0.0"))); + markShardsAvailable(clusterStateBuilder); + + assertThat(manager.getUpgradeStatus(clusterStateBuilder.build(), DESCRIPTOR), equalTo(UpgradeStatus.NEEDS_MAPPINGS_UPDATE)); + } + + /** + * Check that the manager submits the expected request for an index whose mappings are out-of-date. + */ + public void testManagerSubmitsPutRequest() { + SystemIndices systemIndices = new SystemIndices(Map.of("MyIndex", List.of(DESCRIPTOR))); + SystemIndexManager manager = new SystemIndexManager(systemIndices, client); + + final ClusterState.Builder clusterStateBuilder = createClusterState(Strings.toString(getMappings("1.0.0"))); + markShardsAvailable(clusterStateBuilder); + + manager.clusterChanged(event(clusterStateBuilder)); + + verify(client, times(1)).execute(any(PutMappingAction.class), any(PutMappingRequest.class), any()); + } + + private static ClusterState.Builder createClusterState() { + return createClusterState(SystemIndexManagerTests.DESCRIPTOR.getMappings()); + } + + private static ClusterState.Builder createClusterState(String mappings) { + return createClusterState(mappings, IndexMetadata.State.OPEN); + } + + private static ClusterState.Builder createClusterState(IndexMetadata.State state) { + return createClusterState(SystemIndexManagerTests.DESCRIPTOR.getMappings(), 6, state); + } + + private static ClusterState.Builder createClusterState(String mappings, IndexMetadata.State state) { + return createClusterState(mappings, 6, state); + } + + private static ClusterState.Builder createClusterState(int format) { + return createClusterState(SystemIndexManagerTests.DESCRIPTOR.getMappings(), format, IndexMetadata.State.OPEN); + } + + private static ClusterState.Builder createClusterState(String mappings, int format, IndexMetadata.State state) { + IndexMetadata.Builder indexMeta = getIndexMetadata(SystemIndexManagerTests.DESCRIPTOR, mappings, format, state); + + Metadata.Builder metadataBuilder = new Metadata.Builder(); + metadataBuilder.put(indexMeta); + + return ClusterState.builder(state()).metadata(metadataBuilder.build()); + } + + private void markShardsAvailable(ClusterState.Builder clusterStateBuilder) { + clusterStateBuilder.routingTable(buildIndexRoutingTable(DESCRIPTOR.getPrimaryIndex())); + } + + private void markShardsUnavailable(ClusterState.Builder clusterStateBuilder) { + final RoutingTable routingTable = buildIndexRoutingTable(DESCRIPTOR.getPrimaryIndex()); + + Index prevIndex = routingTable.index(DESCRIPTOR.getPrimaryIndex()).getIndex(); + + final RoutingTable unavailableRoutingTable = RoutingTable.builder() + .add( + IndexRoutingTable.builder(prevIndex) + .addIndexShard( + new IndexShardRoutingTable.Builder(new ShardId(prevIndex, 0)).addShard( + ShardRouting.newUnassigned( + new ShardId(prevIndex, 0), + true, + RecoverySource.ExistingStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "") + ) + .initialize(UUIDs.randomBase64UUID(random()), null, 0L) + .moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "")) + ).build() + ) + ) + .build(); + + clusterStateBuilder.routingTable(unavailableRoutingTable); + } + + private static ClusterState state() { + final DiscoveryNodes nodes = DiscoveryNodes.builder().masterNodeId("1").localNodeId("1").build(); + return ClusterState.builder(CLUSTER_NAME).nodes(nodes).metadata(Metadata.builder().generateClusterUuidIfNeeded()).build(); + } + + private static IndexMetadata.Builder getIndexMetadata( + SystemIndexDescriptor descriptor, + String mappings, + int format, + IndexMetadata.State state + ) { + IndexMetadata.Builder indexMetadata = IndexMetadata.builder( + descriptor.getPrimaryIndex() == null ? descriptor.getIndexPattern() : descriptor.getPrimaryIndex() + ); + + final Settings.Builder settingsBuilder = Settings.builder(); + if (descriptor.getSettings() != null) { + settingsBuilder.put(descriptor.getSettings()); + } else { + settingsBuilder.put(getSettings()); + } + settingsBuilder.put(IndexMetadata.INDEX_FORMAT_SETTING.getKey(), format); + indexMetadata.settings(settingsBuilder.build()); + + if (descriptor.getAliasName() != null) { + indexMetadata.putAlias(AliasMetadata.builder(descriptor.getAliasName()).build()); + } + indexMetadata.state(state); + if (mappings != null) { + indexMetadata.putMapping(mappings); + } + + return indexMetadata; + } + + private static RoutingTable buildIndexRoutingTable(String indexName) { + Index index = new Index(indexName, UUID.randomUUID().toString()); + ShardRouting shardRouting = ShardRouting.newUnassigned( + new ShardId(index, 0), + true, + RecoverySource.ExistingStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "") + ); + String nodeId = ESTestCase.randomAlphaOfLength(8); + IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(new ShardId(index, 0)).addShard( + shardRouting.initialize(nodeId, null, shardRouting.getExpectedShardSize()).moveToStarted() + ).build(); + return RoutingTable.builder().add(IndexRoutingTable.builder(index).addIndexShard(table).build()).build(); + } + + private ClusterChangedEvent event(ClusterState.Builder clusterStateBuilder) { + return new ClusterChangedEvent("test-event", clusterStateBuilder.build(), EMPTY_CLUSTER_STATE); + } + + private static Settings getSettings() { + return Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.INDEX_FORMAT_SETTING.getKey(), 6) + .build(); + } + + private static XContentBuilder getMappings() { + return getMappings(Version.CURRENT.toString()); + } + + private static XContentBuilder getMappings(String version) { + try { + final XContentBuilder builder = jsonBuilder(); + + builder.startObject(); + { + builder.startObject("_meta"); + builder.field("version", version); + builder.endObject(); + + builder.field("dynamic", "strict"); + builder.startObject("properties"); + { + builder.startObject("completed"); + builder.field("type", "boolean"); + builder.endObject(); + } + builder.endObject(); + } + + builder.endObject(); + return builder; + } catch (IOException e) { + throw new UncheckedIOException("Failed to build " + SYSTEM_INDEX_NAME + " index mappings", e); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/indices/SystemIndicesTests.java b/server/src/test/java/org/elasticsearch/indices/SystemIndicesTests.java index 439df84e184b2..a61cb64e1b744 100644 --- a/server/src/test/java/org/elasticsearch/indices/SystemIndicesTests.java +++ b/server/src/test/java/org/elasticsearch/indices/SystemIndicesTests.java @@ -100,4 +100,61 @@ public void testPluginCannotOverrideBuiltInSystemIndex() { IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new SystemIndices(pluginMap)); assertThat(e.getMessage(), containsString("plugin or module attempted to define the same source")); } + + public void testPatternWithSimpleRange() { + + final SystemIndices systemIndices = new SystemIndices(Map.of("test", List.of(new SystemIndexDescriptor(".test-[abc]", "")))); + + assertThat(systemIndices.isSystemIndex(".test-a"), equalTo(true)); + assertThat(systemIndices.isSystemIndex(".test-b"), equalTo(true)); + assertThat(systemIndices.isSystemIndex(".test-c"), equalTo(true)); + + assertThat(systemIndices.isSystemIndex(".test-aa"), equalTo(false)); + assertThat(systemIndices.isSystemIndex(".test-d"), equalTo(false)); + assertThat(systemIndices.isSystemIndex(".test-"), equalTo(false)); + assertThat(systemIndices.isSystemIndex(".test-="), equalTo(false)); + } + + public void testPatternWithSimpleRangeAndRepeatOperator() { + final SystemIndices systemIndices = new SystemIndices(Map.of("test", List.of(new SystemIndexDescriptor(".test-[a]+", "")))); + + assertThat(systemIndices.isSystemIndex(".test-a"), equalTo(true)); + assertThat(systemIndices.isSystemIndex(".test-aa"), equalTo(true)); + assertThat(systemIndices.isSystemIndex(".test-aaa"), equalTo(true)); + + assertThat(systemIndices.isSystemIndex(".test-b"), equalTo(false)); + } + + public void testPatternWithComplexRange() { + final SystemIndices systemIndices = new SystemIndices(Map.of("test", List.of(new SystemIndexDescriptor(".test-[a-c]", "")))); + + assertThat(systemIndices.isSystemIndex(".test-a"), equalTo(true)); + assertThat(systemIndices.isSystemIndex(".test-b"), equalTo(true)); + assertThat(systemIndices.isSystemIndex(".test-c"), equalTo(true)); + + assertThat(systemIndices.isSystemIndex(".test-aa"), equalTo(false)); + assertThat(systemIndices.isSystemIndex(".test-d"), equalTo(false)); + assertThat(systemIndices.isSystemIndex(".test-"), equalTo(false)); + assertThat(systemIndices.isSystemIndex(".test-="), equalTo(false)); + } + + public void testOverlappingDescriptorsWithRanges() { + String source1 = "source1"; + String source2 = "source2"; + + SystemIndexDescriptor pattern1 = new SystemIndexDescriptor(".test-[ab]*", ""); + SystemIndexDescriptor pattern2 = new SystemIndexDescriptor(".test-a*", ""); + + Map> descriptors = new HashMap<>(); + descriptors.put(source1, List.of(pattern1)); + descriptors.put(source2, List.of(pattern2)); + + IllegalStateException exception = expectThrows(IllegalStateException.class, + () -> SystemIndices.checkForOverlappingPatterns(descriptors)); + + assertThat(exception.getMessage(), containsString("a system index descriptor [" + pattern1 + + "] from [" + source1 + "] overlaps with other system index descriptors:")); + + assertThat(exception.getMessage(), containsString(pattern2.toString() + " from [" + source2 + "]")); + } } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index a6b30646cf701..a61011667a457 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -158,6 +158,7 @@ public ClusterStateChanges(NamedXContentRegistry xContentRegistry, ThreadPool th = new ShardStateAction.ShardStartedClusterStateTaskExecutor(allocationService, null, logger); ActionFilters actionFilters = new ActionFilters(Collections.emptySet()); IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)); + SystemIndices systemIndices = new SystemIndices(Map.of()); DestructiveOperations destructiveOperations = new DestructiveOperations(SETTINGS, clusterSettings); Environment environment = TestEnvironment.newEnvironment(SETTINGS); Transport transport = mock(Transport.class); // it's not used @@ -233,11 +234,12 @@ allocationService, new AliasValidator(), shardLimitValidator, environment, transportDeleteIndexAction = new TransportDeleteIndexAction(transportService, clusterService, threadPool, deleteIndexService, actionFilters, indexNameExpressionResolver, destructiveOperations); transportUpdateSettingsAction = new TransportUpdateSettingsAction( - transportService, clusterService, threadPool, metadataUpdateSettingsService, actionFilters, indexNameExpressionResolver); + transportService, clusterService, threadPool, metadataUpdateSettingsService, actionFilters, indexNameExpressionResolver, + systemIndices); transportClusterRerouteAction = new TransportClusterRerouteAction( transportService, clusterService, threadPool, allocationService, actionFilters, indexNameExpressionResolver); transportCreateIndexAction = new TransportCreateIndexAction( - transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver); + transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver, systemIndices); nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); joinTaskExecutor = new JoinTaskExecutor(allocationService, logger, (s, p, r) -> {}); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 42865ef327abe..555245343bd12 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1512,6 +1512,9 @@ protected NamedWriteableRegistry writeableRegistry() { threadPool, shardStateAction, actionFilters)); final MetadataMappingService metadataMappingService = new MetadataMappingService(clusterService, indicesService); peerRecoverySourceService = new PeerRecoverySourceService(transportService, indicesService, recoverySettings); + + final SystemIndices systemIndices = new SystemIndices(Map.of()); + indicesClusterStateService = new IndicesClusterStateService( settings, indicesService, @@ -1535,19 +1538,19 @@ protected NamedWriteableRegistry writeableRegistry() { shardStateAction, actionFilters, new IndexingPressure(settings), - new SystemIndices(Map.of()))), + systemIndices)), RetentionLeaseSyncer.EMPTY, client); final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService); final MetadataCreateIndexService metadataCreateIndexService = new MetadataCreateIndexService(settings, clusterService, indicesService, allocationService, new AliasValidator(), shardLimitValidator, environment, indexScopedSettings, - threadPool, namedXContentRegistry, new SystemIndices(Map.of()), false); + threadPool, namedXContentRegistry, systemIndices, false); actions.put(CreateIndexAction.INSTANCE, new TransportCreateIndexAction( transportService, clusterService, threadPool, metadataCreateIndexService, - actionFilters, indexNameExpressionResolver + actionFilters, indexNameExpressionResolver, systemIndices )); final MappingUpdatedAction mappingUpdatedAction = new MappingUpdatedAction(settings, clusterSettings); final IndexingPressure indexingMemoryLimits = new IndexingPressure(settings); @@ -1560,11 +1563,11 @@ allocationService, new AliasValidator(), shardLimitValidator, environment, index Collections.emptyList(), client), client, actionFilters, indexNameExpressionResolver, new IndexingPressure(settings), - new SystemIndices(Map.of()) + systemIndices )); final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService, clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService), - actionFilters, indexingMemoryLimits, new SystemIndices(Map.of())); + actionFilters, indexingMemoryLimits, systemIndices); actions.put(TransportShardBulkAction.TYPE, transportShardBulkAction); final RestoreService restoreService = new RestoreService( clusterService, repositoriesService, allocationService, @@ -1573,14 +1576,15 @@ clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedActi settings, namedXContentRegistry, mapperRegistry, indexScopedSettings, - new SystemIndices(Map.of()), + systemIndices, null), clusterSettings, shardLimitValidator ); actions.put(PutMappingAction.INSTANCE, new TransportPutMappingAction(transportService, clusterService, threadPool, metadataMappingService, - actionFilters, indexNameExpressionResolver, new RequestValidators<>(Collections.emptyList()))); + actionFilters, indexNameExpressionResolver, new RequestValidators<>(Collections.emptyList()), + new SystemIndices(Map.of()))); actions.put(AutoPutMappingAction.INSTANCE, new TransportAutoPutMappingAction(transportService, clusterService, threadPool, metadataMappingService, actionFilters, indexNameExpressionResolver)); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index 950a371e07c07..4030222d3bab3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -44,6 +44,7 @@ import org.elasticsearch.index.analysis.TokenizerFactory; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.shard.IndexSettingProvider; +import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.indices.analysis.AnalysisModule; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.recovery.RecoverySettings; @@ -532,4 +533,11 @@ private List filterPlugins(Class type) { .collect(Collectors.toList()); } + @Override + public Collection getSystemIndexDescriptors(Settings settings) { + return this.filterPlugins(SystemIndexPlugin.class) + .stream() + .flatMap(p -> p.getSystemIndexDescriptors(this.settings).stream()) + .collect(Collectors.toList()); + } } diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java index fcbcc2f8db6b3..0e3c7738a1243 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java @@ -123,8 +123,10 @@ protected void createAndPopulateIndex(String indexName, Settings.Builder setting protected void populateIndex(String indexName, int maxIndexRequests) throws InterruptedException { final List indexRequestBuilders = new ArrayList<>(); + // This index does not permit dynamic fields, so we can only use defined field names + final String key = indexName.equals(SearchableSnapshotsConstants.SNAPSHOT_BLOB_CACHE_INDEX) ? "type" : "foo"; for (int i = between(10, maxIndexRequests); i >= 0; i--) { - indexRequestBuilders.add(client().prepareIndex(indexName).setSource("foo", randomBoolean() ? "bar" : "baz")); + indexRequestBuilders.add(client().prepareIndex(indexName).setSource(key, randomBoolean() ? "bar" : "baz")); } indexRandom(true, true, indexRequestBuilders); refresh(indexName);