Skip to content

Commit

Permalink
Add infrastructure for managing system indices (#65604)
Browse files Browse the repository at this point in the history
Part of #61656.

Add the necessary support for automatically creating and updating system
indices. This works by making it possible to create a system index
descriptor with all the information needed to manage the mappings,
settings and aliases.

Follow-up work will opt existing indices into this framework.
  • Loading branch information
pugnascotia authored Dec 4, 2020
1 parent 201b25e commit cbd5d12
Show file tree
Hide file tree
Showing 16 changed files with 1,310 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
*/
package org.elasticsearch.action.admin.indices.create;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.ActiveShardsObserver;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -40,18 +43,24 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

/**
* Api that auto creates an index or data stream that originate from requests that write into an index that doesn't yet exist.
*/
public final class AutoCreateAction extends ActionType<CreateIndexResponse> {

private static final Logger logger = LogManager.getLogger(AutoCreateAction.class);

public static final AutoCreateAction INSTANCE = new AutoCreateAction();
public static final String NAME = "indices:admin/auto_create";

Expand All @@ -65,15 +74,17 @@ public static final class TransportAction extends TransportMasterNodeAction<Crea
private final MetadataCreateIndexService createIndexService;
private final MetadataCreateDataStreamService metadataCreateDataStreamService;
private final AutoCreateIndex autoCreateIndex;
private final SystemIndices systemIndices;

@Inject
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
MetadataCreateIndexService createIndexService,
MetadataCreateDataStreamService metadataCreateDataStreamService,
AutoCreateIndex autoCreateIndex) {
AutoCreateIndex autoCreateIndex, SystemIndices systemIndices) {
super(NAME, transportService, clusterService, threadPool, actionFilters, CreateIndexRequest::new, indexNameExpressionResolver,
CreateIndexResponse::new, ThreadPool.Names.SAME);
this.systemIndices = systemIndices;
this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
this.createIndexService = createIndexService;
this.metadataCreateDataStreamService = metadataCreateDataStreamService;
Expand Down Expand Up @@ -141,12 +152,51 @@ public ClusterState execute(ClusterState currentState) throws Exception {
return currentState;
}

CreateIndexClusterStateUpdateRequest updateRequest =
new CreateIndexClusterStateUpdateRequest(request.cause(), indexName, request.index())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout());
final SystemIndexDescriptor descriptor = systemIndices.findMatchingDescriptor(indexName);
CreateIndexClusterStateUpdateRequest updateRequest = descriptor != null && descriptor.isAutomaticallyManaged()
? buildSystemIndexUpdateRequest(descriptor)
: buildUpdateRequest(indexName);

return createIndexService.applyCreateIndexRequest(currentState, updateRequest, false);
}
}

private CreateIndexClusterStateUpdateRequest buildUpdateRequest(String indexName) {
CreateIndexClusterStateUpdateRequest updateRequest =
new CreateIndexClusterStateUpdateRequest(request.cause(), indexName, request.index())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout());
logger.debug("Auto-creating index {}", indexName);
return updateRequest;
}

private CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateRequest(SystemIndexDescriptor descriptor) {
String mappings = descriptor.getMappings();
Settings settings = descriptor.getSettings();
String aliasName = descriptor.getAliasName();
String concreteIndexName = descriptor.getPrimaryIndex();

CreateIndexClusterStateUpdateRequest updateRequest =
new CreateIndexClusterStateUpdateRequest(request.cause(), concreteIndexName, request.index())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout());

updateRequest.waitForActiveShards(ActiveShardCount.ALL);

if (mappings != null) {
updateRequest.mappings(mappings);
}
if (settings != null) {
updateRequest.settings(settings);
}
if (aliasName != null) {
updateRequest.aliases(Set.of(new Alias(aliasName)));
}

logger.debug("Auto-creating system index {}", concreteIndexName);

return updateRequest;
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.elasticsearch.action.admin.indices.create;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand All @@ -29,24 +31,33 @@
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.Objects;
import java.util.Set;

/**
* Create index action.
*/
public class TransportCreateIndexAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {

private final MetadataCreateIndexService createIndexService;
private final SystemIndices systemIndices;

@Inject
public TransportCreateIndexAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetadataCreateIndexService createIndexService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
SystemIndices systemIndices) {
super(CreateIndexAction.NAME, transportService, clusterService, threadPool, actionFilters, CreateIndexRequest::new,
indexNameExpressionResolver, CreateIndexResponse::new, ThreadPool.Names.SAME);
this.createIndexService = createIndexService;
this.systemIndices = systemIndices;
}

@Override
Expand All @@ -58,20 +69,55 @@ protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterSt
protected void masterOperation(Task task, final CreateIndexRequest request, final ClusterState state,
final ActionListener<CreateIndexResponse> listener) {
String cause = request.cause();
if (cause.length() == 0) {
if (cause.isEmpty()) {
cause = "api";
}

final String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());
final CreateIndexClusterStateUpdateRequest updateRequest =
new CreateIndexClusterStateUpdateRequest(cause, indexName, request.index())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings()).mappings(request.mappings())
.aliases(request.aliases())
.waitForActiveShards(request.waitForActiveShards());

final SystemIndexDescriptor descriptor = systemIndices.findMatchingDescriptor(indexName);
final CreateIndexClusterStateUpdateRequest updateRequest = descriptor != null && descriptor.isAutomaticallyManaged()
? buildSystemIndexUpdateRequest(request, cause, descriptor)
: buildUpdateRequest(request, cause, indexName);

createIndexService.createIndex(updateRequest, listener.map(response ->
new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));
}

private CreateIndexClusterStateUpdateRequest buildUpdateRequest(CreateIndexRequest request, String cause, String indexName) {
return new CreateIndexClusterStateUpdateRequest(cause, indexName, request.index()).ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings())
.mappings(request.mappings())
.aliases(request.aliases())
.waitForActiveShards(request.waitForActiveShards());
}

private CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateRequest(
CreateIndexRequest request,
String cause,
SystemIndexDescriptor descriptor
) {
final Settings settings = Objects.requireNonNullElse(descriptor.getSettings(), Settings.EMPTY);

final Set<Alias> aliases;
if (descriptor.getAliasName() == null) {
aliases = Set.of();
} else {
aliases = Set.of(new Alias(descriptor.getAliasName()));
}

final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(
cause,
descriptor.getPrimaryIndex(),
request.index()
);

return updateRequest.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout())
.aliases(aliases)
.waitForActiveShards(ActiveShardCount.ALL)
.mappings(descriptor.getMappings())
.settings(settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -55,6 +57,7 @@ public class TransportPutMappingAction extends AcknowledgedTransportMasterNodeAc

private final MetadataMappingService metadataMappingService;
private final RequestValidators<PutMappingRequest> requestValidators;
private final SystemIndices systemIndices;

@Inject
public TransportPutMappingAction(
Expand All @@ -64,11 +67,13 @@ public TransportPutMappingAction(
final MetadataMappingService metadataMappingService,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver,
final RequestValidators<PutMappingRequest> requestValidators) {
final RequestValidators<PutMappingRequest> requestValidators,
final SystemIndices systemIndices) {
super(PutMappingAction.NAME, transportService, clusterService, threadPool, actionFilters, PutMappingRequest::new,
indexNameExpressionResolver, ThreadPool.Names.SAME);
this.metadataMappingService = metadataMappingService;
this.requestValidators = Objects.requireNonNull(requestValidators);
this.systemIndices = systemIndices;
}

@Override
Expand All @@ -87,12 +92,25 @@ protected void masterOperation(Task task, final PutMappingRequest request, final
final ActionListener<AcknowledgedResponse> listener) {
try {
final Index[] concreteIndices = resolveIndices(state, request, indexNameExpressionResolver);
final String mappingSource = request.source();

final Optional<Exception> maybeValidationException = requestValidators.validateRequest(request, state, concreteIndices);
if (maybeValidationException.isPresent()) {
listener.onFailure(maybeValidationException.get());
return;
}

final List<String> violations = checkForSystemIndexViolations(concreteIndices, mappingSource);
if (violations.isEmpty() == false) {
final String message = "Cannot update mappings in "
+ violations
+ ": system indices can only use mappings from their descriptors, "
+ "but the mappings in the request did not match those in the descriptors(s)";
logger.warn(message);
listener.onFailure(new IllegalArgumentException(message));
return;
}

performMappingUpdate(concreteIndices, request, listener, metadataMappingService);
} catch (IndexNotFoundException ex) {
logger.debug(() -> new ParameterizedMessage("failed to put mappings on indices [{}]",
Expand Down Expand Up @@ -142,4 +160,21 @@ public void onFailure(Exception t) {
});
}

private List<String> checkForSystemIndexViolations(Index[] concreteIndices, String requestMappings) {
List<String> violations = new ArrayList<>();

for (Index index : concreteIndices) {
final SystemIndexDescriptor descriptor = systemIndices.findMatchingDescriptor(index.getName());
if (descriptor != null && descriptor.isAutomaticallyManaged()) {
final String descriptorMappings = descriptor.getMappings();

// Technically we could trip over a difference in whitespace here, but then again nobody should be trying to manually
// update a descriptor's mappings.
if (descriptorMappings.equals(requestMappings) == false) {
violations.add(index.getName());
}
}
}
return violations;
}
}
Loading

0 comments on commit cbd5d12

Please sign in to comment.