Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add auto create action #56122

Merged
merged 2 commits into from
May 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.indices.create.AutoCreateAction;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction;
Expand Down Expand Up @@ -597,6 +601,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(ClearScrollAction.INSTANCE, TransportClearScrollAction.class);
actions.register(RecoveryAction.INSTANCE, TransportRecoveryAction.class);
actions.register(NodesReloadSecureSettingsAction.INSTANCE, TransportNodesReloadSecureSettingsAction.class);
actions.register(AutoCreateAction.INSTANCE, AutoCreateAction.TransportAction.class);

//Indexed scripts
actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.create;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;

/**
* Api that auto creates an index that originate from requests that write into an index that doesn't yet exist.
*/
public final class AutoCreateAction extends ActionType<CreateIndexResponse> {

public static final AutoCreateAction INSTANCE = new AutoCreateAction();
public static final String NAME = "indices:admin/auto_create";

private AutoCreateAction() {
super(NAME, CreateIndexResponse::new);
}

public static final class TransportAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {

private final MetadataCreateIndexService createIndexService;

@Inject
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
MetadataCreateIndexService createIndexService) {
super(NAME, transportService, clusterService, threadPool, actionFilters, CreateIndexRequest::new, indexNameExpressionResolver);
this.createIndexService = createIndexService;
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected CreateIndexResponse read(StreamInput in) throws IOException {
return new CreateIndexResponse(in);
}

@Override
protected void masterOperation(CreateIndexRequest request,
ClusterState state,
ActionListener<CreateIndexResponse> listener) throws Exception {
TransportCreateIndexAction.innerCreateIndex(request, listener, indexNameExpressionResolver, createIndexService);
}

@Override
protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.index());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,20 @@ protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterSt
@Override
protected void masterOperation(final CreateIndexRequest request, final ClusterState state,
final ActionListener<CreateIndexResponse> listener) {
String cause = request.cause();
if (cause.length() == 0) {
cause = "api";
if (request.cause().length() == 0) {
request.cause("api");
}

innerCreateIndex(request, listener, indexNameExpressionResolver, createIndexService);
}

static void innerCreateIndex(CreateIndexRequest request,
ActionListener<CreateIndexResponse> listener,
IndexNameExpressionResolver indexNameExpressionResolver,
MetadataCreateIndexService createIndexService) {
final String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());
final CreateIndexClusterStateUpdateRequest updateRequest =
new CreateIndexClusterStateUpdateRequest(cause, indexName, request.index())
new CreateIndexClusterStateUpdateRequest(request.cause(), indexName, request.index())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings()).mappings(request.mappings())
.aliases(request.aliases())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.AutoCreateAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -241,7 +242,8 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
} else {
final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
for (String index : autoCreateIndices) {
createIndex(index, bulkRequest.preferV2Templates(), bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() {
createIndex(index, bulkRequest.preferV2Templates(), bulkRequest.timeout(), minNodeVersion,
new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
Expand Down Expand Up @@ -385,13 +387,21 @@ boolean shouldAutoCreate(String index, ClusterState state) {
return autoCreateIndex.shouldAutoCreate(index, state);
}

void createIndex(String index, Boolean preferV2Templates, TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
void createIndex(String index,
Boolean preferV2Templates,
TimeValue timeout,
Version minNodeVersion,
ActionListener<CreateIndexResponse> listener) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index(index);
createIndexRequest.cause("auto(bulk api)");
createIndexRequest.masterNodeTimeout(timeout);
createIndexRequest.preferV2Templates(preferV2Templates);
client.admin().indices().create(createIndexRequest, listener);
if (minNodeVersion.onOrAfter(Version.V_7_8_0)) {
client.execute(AutoCreateAction.INSTANCE, createIndexRequest, listener);
} else {
client.admin().indices().create(createIndexRequest, listener);
}
}

private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, DocWriteRequest<?> request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.index.VersionType;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -111,7 +112,7 @@ private void indicesThatCannotBeCreatedTestCase(Set<String> expected,
when(clusterService.state()).thenReturn(state);
DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
when(state.getNodes()).thenReturn(discoveryNodes);
when(discoveryNodes.getMinNodeVersion()).thenReturn(Version.CURRENT);
when(discoveryNodes.getMinNodeVersion()).thenReturn(VersionUtils.randomCompatibleVersion(random(), Version.CURRENT));
DiscoveryNode localNode = mock(DiscoveryNode.class);
when(clusterService.localNode()).thenReturn(localNode);
when(localNode.isIngestNode()).thenReturn(randomBoolean());
Expand All @@ -138,7 +139,7 @@ boolean shouldAutoCreate(String index, ClusterState state) {

@Override
void createIndex(String index, Boolean preferV2Templates,
TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
TimeValue timeout, Version minNodeVersion, ActionListener<CreateIndexResponse> listener) {
// If we try to create an index just immediately assume it worked
listener.onResponse(new CreateIndexResponse(true, true, index) {});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -157,7 +158,8 @@ void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeN

@Override
void createIndex(String index, Boolean preferV2Templates,
TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
TimeValue timeout, Version minNodeVersion,
ActionListener<CreateIndexResponse> listener) {
indexCreated = true;
listener.onResponse(null);
}
Expand Down Expand Up @@ -192,7 +194,7 @@ public void setupAction() {
ImmutableOpenMap<String, DiscoveryNode> ingestNodes = ImmutableOpenMap.<String, DiscoveryNode>builder(2)
.fPut("node1", remoteNode1).fPut("node2", remoteNode2).build();
when(nodes.getIngestNodes()).thenReturn(ingestNodes);
when(nodes.getMinNodeVersion()).thenReturn(Version.CURRENT);
when(nodes.getMinNodeVersion()).thenReturn(VersionUtils.randomCompatibleVersion(random(), Version.CURRENT));
ClusterState state = mock(ClusterState.class);
when(state.getNodes()).thenReturn(nodes);
Metadata metadata = Metadata.builder().indices(ImmutableOpenMap.<String, IndexMetadata>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -40,6 +42,7 @@
import org.elasticsearch.index.VersionType;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -80,7 +83,7 @@ protected boolean needToCheck() {

@Override
void createIndex(String index, Boolean preferV2Templates,
TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
TimeValue timeout, Version minNodeVersion, ActionListener<CreateIndexResponse> listener) {
indexCreated = true;
listener.onResponse(null);
}
Expand All @@ -90,7 +93,9 @@ void createIndex(String index, Boolean preferV2Templates,
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(getClass().getName());
clusterService = createClusterService(threadPool);
DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
DiscoveryNodeRole.BUILT_IN_ROLES, VersionUtils.randomCompatibleVersion(random(), Version.CURRENT));
clusterService = createClusterService(threadPool, discoveryNode);
CapturingTransport capturingTransport = new CapturingTransport();
transportService = capturingTransport.createTransportService(clusterService.getSettings(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.lucene.util.Constants;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
Expand All @@ -32,6 +33,8 @@
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -41,6 +44,7 @@
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -82,7 +86,9 @@ public static void afterClass() {
@Before
public void setUp() throws Exception {
super.setUp();
clusterService = createClusterService(threadPool);
DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
DiscoveryNodeRole.BUILT_IN_ROLES, VersionUtils.randomCompatibleVersion(random(), Version.CURRENT));
clusterService = createClusterService(threadPool, discoveryNode);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.admin.indices.alias.exists.AliasesExistAction;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
import org.elasticsearch.action.admin.indices.close.CloseIndexAction;
import org.elasticsearch.action.admin.indices.create.AutoCreateAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsAction;
Expand Down Expand Up @@ -60,7 +61,7 @@ public final class IndexPrivilege extends Privilege {
private static final Automaton MONITOR_AUTOMATON = patterns("indices:monitor/*");
private static final Automaton MANAGE_AUTOMATON =
unionAndMinimize(Arrays.asList(MONITOR_AUTOMATON, patterns("indices:admin/*")));
private static final Automaton CREATE_INDEX_AUTOMATON = patterns(CreateIndexAction.NAME);
private static final Automaton CREATE_INDEX_AUTOMATON = patterns(CreateIndexAction.NAME, AutoCreateAction.NAME);
private static final Automaton DELETE_INDEX_AUTOMATON = patterns(DeleteIndexAction.NAME);
private static final Automaton VIEW_METADATA_AUTOMATON = patterns(GetAliasesAction.NAME, AliasesExistAction.NAME,
GetIndexAction.NAME, IndicesExistsAction.NAME, GetFieldMappingsAction.NAME + "*", GetMappingsAction.NAME,
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugin/ml/qa/ml-with-security/roles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ minimal:
# non-ML indices
- names: [ 'airline-data', 'index-*', 'unavailable-data', 'utopia' ]
privileges:
- indices:admin/create
- create_index
- indices:admin/refresh
- read
- index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ protected String configRoles() {
" cluster: [ ALL ]\n" +
" indices:\n" +
" - names: 'missing'\n" +
" privileges: [ 'indices:admin/create', 'indices:admin/delete' ]\n" +
" privileges: [ 'indices:admin/create', 'indices:admin/auto_create', " +
"'indices:admin/delete' ]\n" +
" - names: ['/index.*/']\n" +
" privileges: [ manage ]\n" +
" - names: ['/test.*/']\n" +
Expand Down
Loading