From 80af7b446290ade5522944ad5e4fcfe4174fd839 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 28 Apr 2020 15:33:07 +0200 Subject: [PATCH 01/10] Add auto create action Currently the TransportBulkAction detects whether an index is missing and then decides whether it should be auto created. The coordination of the index creation also happens in the TransportBulkAction on the coordinating node. This change adds a new transport action that the TransportBulkAction delegates to if missing indices need to be created. The reasons for this change: * Auto creation of data streams can't occur on the coordinating node. Based on the index template (v2) either a regular index or a data stream should be created. However if the coordinating node is unaware of certain index templates then the TransportBulkAction could create an index instead of a data stream. Therefor the decision of whether an index or data stream should be created should happen on the master node. See #55377 * From a security perspective it is useful to know whether index creation originates from the create index api or from auto creating a new index via the bulk or index api. For example a user would be allowed to auto create an index, but not to use the create index api. The auto create action will allow security to distinguish these two different patterns of index creation. --- .../elasticsearch/action/ActionModule.java | 2 + .../indices/create/AutoCreateAction.java | 259 ++++++++++++++++++ .../action/bulk/TransportBulkAction.java | 113 ++++++-- .../indices/create/AutoCreateActionTests.java | 168 ++++++++++++ .../create/AutoCreateRequestTests.java | 36 +++ .../create/AutoCreateResponseTests.java | 44 +++ ...ActionIndicesThatCannotBeCreatedTests.java | 11 +- .../bulk/TransportBulkActionIngestTests.java | 12 +- .../action/bulk/TransportBulkActionTests.java | 17 +- .../bulk/TransportBulkActionTookTests.java | 8 +- 10 files changed, 642 insertions(+), 28 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateActionTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateRequestTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateResponseTests.java diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index cb52f610607f8..6cb23c2b58e77 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction; import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.elasticsearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction; +import org.elasticsearch.action.admin.indices.create.AutoCreateAction; import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction; import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; @@ -580,6 +581,7 @@ public void reg actions.register(ClearScrollAction.INSTANCE, TransportClearScrollAction.class); actions.register(RecoveryAction.INSTANCE, TransportRecoveryAction.class); actions.register(NodesReloadSecureSettingsAction.INSTANCE, TransportNodesReloadSecureSettingsAction.class); + actions.register(AutoCreateAction.INSTANCE, AutoCreateAction.TransportAction.class); //Indexed scripts actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java new file mode 100644 index 0000000000000..841bc685773bf --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java @@ -0,0 +1,259 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.create; + +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +public final class AutoCreateAction extends ActionType { + + public static final AutoCreateAction INSTANCE = new AutoCreateAction(); + public static final String NAME = "indices:admin/auto_create"; + + private AutoCreateAction() { + super(NAME, Response::new); + } + + public static class Request extends MasterNodeReadRequest { + + private final Set names; + private final String cause; + private final Boolean preferV2Templates; + + public Request(Set names, String cause, Boolean preferV2Templates) { + this.names = Objects.requireNonNull(names); + this.cause = Objects.requireNonNull(cause); + this.preferV2Templates = preferV2Templates; + assert names.size() != 0; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.names = in.readSet(StreamInput::readString); + this.cause = in.readString(); + this.preferV2Templates = in.readOptionalBoolean(); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringCollection(names); + out.writeString(cause); + out.writeOptionalBoolean(preferV2Templates); + } + + public Set getNames() { + return names; + } + + public String getCause() { + return cause; + } + + public Boolean getPreferV2Templates() { + return preferV2Templates; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return names.equals(request.names) && + Objects.equals(preferV2Templates, request.preferV2Templates); + } + + @Override + public int hashCode() { + return Objects.hash(names, preferV2Templates); + } + } + + public static class Response extends ActionResponse { + + private final Map failureByNames; + + public Response(Map failureByNames) { + this.failureByNames = failureByNames; + } + + public Response(StreamInput in) throws IOException { + super(in); + failureByNames = in.readMap(StreamInput::readString, StreamInput::readException); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(failureByNames, StreamOutput::writeString, StreamOutput::writeException); + } + + public Map getFailureByNames() { + return failureByNames; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + /* + * Exception does not implement equals(...) so we will compute the hash code based on the key set and the + * messages. + */ + return Objects.equals(getKeys(failureByNames), getKeys(response.failureByNames)) && + Objects.equals(getExceptionMessages(failureByNames), getExceptionMessages(response.failureByNames)); + } + + @Override + public int hashCode() { + /* + * Exception does not implement hash code so we will compute the hash code based on the key set and the + * messages. + */ + return Objects.hash(getKeys(failureByNames), getExceptionMessages(failureByNames)); + } + + private static List getExceptionMessages(final Map map) { + return map.values().stream().map(Throwable::getMessage).sorted(String::compareTo).collect(Collectors.toList()); + } + + private static List getKeys(final Map map) { + return map.keySet().stream().sorted(String::compareTo).collect(Collectors.toList()); + } + } + + public static final class TransportAction extends TransportMasterNodeAction { + + private final MetadataCreateIndexService createIndexService; + + @Inject + public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + MetadataCreateIndexService createIndexService) { + super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); + this.createIndexService = createIndexService; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected Response read(StreamInput in) throws IOException { + return new Response(in); + } + + @Override + protected void masterOperation(Task task, + Request request, + ClusterState state, + ActionListener listener) throws Exception { + // Should this be an AckedClusterStateUpdateTask and + // should we add ActiveShardsObserver.waitForActiveShards(...) here? + // (This api used from TransportBulkAction only and it currently ignores CreateIndexResponse, so + // I think there is no need to include acked and shard ackeds here) + clusterService.submitStateUpdateTask("auto create resources for [" + request.names + "]", + new ClusterStateUpdateTask(Priority.HIGH) { + + final Map result = new HashMap<>(); + + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return autoCreate(request, result, currentState, createIndexService); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new AutoCreateAction.Response(result)); + } + }); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, request.names.toArray(new String[0])); + } + } + + static ClusterState autoCreate(Request request, + Map result, + ClusterState currentState, + MetadataCreateIndexService createIndexService) { + for (String indexName : request.names) { + CreateIndexClusterStateUpdateRequest req = new CreateIndexClusterStateUpdateRequest(request.cause, + indexName, indexName).masterNodeTimeout(request.masterNodeTimeout()) + .preferV2Templates(request.preferV2Templates); + try { + currentState = createIndexService.applyCreateIndexRequest(currentState, req, false); + result.put(indexName, null); + } catch (ResourceAlreadyExistsException e) { + // ignore resource already exists exception. + } catch (Exception e) { + result.put(indexName, e); + } + } + return currentState; + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index cfedb5777e790..d1aae56305ab4 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -33,6 +33,7 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.RoutingMissingException; +import org.elasticsearch.action.admin.indices.create.AutoCreateAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.index.IndexRequest; @@ -56,6 +57,7 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -85,6 +87,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; +import java.util.function.Consumer; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -235,36 +238,91 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener() { - @Override - public void onResponse(CreateIndexResponse result) { - if (counter.decrementAndGet() == 0) { - threadPool.executor(ThreadPool.Names.WRITE).execute( - () -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated)); + if (state.getNodes().getMinNodeVersion().onOrAfter(Version.V_8_0_0)) { + CheckedConsumer handler = response -> { + Exception suppressed = null; + for (Map.Entry entry : response.getFailureByNames().entrySet()) { + Exception e = entry.getValue(); + if (e == null) { + continue; } - } - @Override - public void onFailure(Exception e) { - if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) { - // fail all requests involving this index, if create didn't work - for (int i = 0; i < bulkRequest.requests.size(); i++) { - DocWriteRequest request = bulkRequest.requests.get(i); - if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) { - bulkRequest.requests.set(i, null); - } + String index = entry.getKey(); + for (int i = 0; i < bulkRequest.requests.size(); i++) { + DocWriteRequest request = bulkRequest.requests.get(i); + if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) { + bulkRequest.requests.set(i, null); } } - if (counter.decrementAndGet() == 0) { - executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> { - inner.addSuppressed(e); - listener.onFailure(inner); - }), responses, indicesThatCannotBeCreated); + + if (suppressed == null) { + suppressed = e; + } else { + suppressed.addSuppressed(e); } } - }); + + final ActionListener finalListener; + if (suppressed != null) { + final Exception e = suppressed; + finalListener = ActionListener.wrap(listener::onResponse, inner -> { + inner.addSuppressed(e); + listener.onFailure(inner); + }); + } else { + finalListener = listener; + } + + threadPool.executor(ThreadPool.Names.WRITE).execute( + () -> executeBulk(task, bulkRequest, startTime, finalListener, responses, indicesThatCannotBeCreated)); + }; + Consumer failureHandler = e -> { + // major failure, set error on all bulk items and then return bulk response + for (int i = 0; i < bulkRequest.requests.size(); i++) { + DocWriteRequest request = bulkRequest.requests.get(i); + BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.id(), e); + responses.set(i, new BulkItemResponse(i, request.opType(), failure)); + bulkRequest.requests.set(i, null); + } + executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> { + inner.addSuppressed(e); + listener.onFailure(inner); + }), responses, indicesThatCannotBeCreated); + }; + autoCreate(autoCreateIndices, bulkRequest.preferV2Templates(), bulkRequest.timeout(), + ActionListener.wrap(handler, failureHandler)); + } else { + final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size()); + for (String index : autoCreateIndices) { + createIndex(index, bulkRequest.preferV2Templates(), bulkRequest.timeout(), new ActionListener<>() { + @Override + public void onResponse(CreateIndexResponse result) { + if (counter.decrementAndGet() == 0) { + threadPool.executor(ThreadPool.Names.WRITE).execute( + () -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated)); + } + } + + @Override + public void onFailure(Exception e) { + if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) { + // fail all requests involving this index, if create didn't work + for (int i = 0; i < bulkRequest.requests.size(); i++) { + DocWriteRequest request = bulkRequest.requests.get(i); + if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) { + bulkRequest.requests.set(i, null); + } + } + } + if (counter.decrementAndGet() == 0) { + executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> { + inner.addSuppressed(e); + listener.onFailure(inner); + }), responses, indicesThatCannotBeCreated); + } + } + }); + } } } } else { @@ -390,6 +448,13 @@ void createIndex(String index, Boolean preferV2Templates, TimeValue timeout, Act client.admin().indices().create(createIndexRequest, listener); } + void autoCreate(Set names, Boolean preferV2Templates, TimeValue timeout, + ActionListener listener) { + AutoCreateAction.Request request = new AutoCreateAction.Request(names, "auto(bulk api)", preferV2Templates); + request.masterNodeTimeout(timeout); + client.execute(AutoCreateAction.INSTANCE, request, listener); + } + private boolean setResponseFailureIfIndexMatches(AtomicArray responses, int idx, DocWriteRequest request, String index, Exception e) { if (index.equals(request.index())) { diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateActionTests.java new file mode 100644 index 0000000000000..b7982b8e8c3ab --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateActionTests.java @@ -0,0 +1,168 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.create; + +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.indices.create.AutoCreateAction.Request; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.in; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class AutoCreateActionTests extends ESTestCase { + + public void testAutoCreate() throws Exception { + ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); + Request request = new Request(Set.of("index1", "index2", "index3"), "reason", randomBoolean() ? null : randomBoolean()); + + MetadataCreateIndexService mcis = mock(MetadataCreateIndexService.class); + expectApplyCreateIndexRequest(mcis, request, Set.of()); + + Map result = new HashMap<>(); + cs = AutoCreateAction.autoCreate(request, result, cs, mcis); + assertThat(cs.metadata().indices().size(), equalTo(3)); + assertThat(cs.metadata().index("index1"), notNullValue()); + assertThat(cs.metadata().index("index2"), notNullValue()); + assertThat(cs.metadata().index("index3"), notNullValue()); + assertThat(result.size(), equalTo(3)); + assertThat(result.containsKey("index1"), is(true)); + assertThat(result.get("index1"), nullValue()); + assertThat(result.containsKey("index2"), is(true)); + assertThat(result.get("index2"), nullValue()); + assertThat(result.containsKey("index3"), is(true)); + assertThat(result.get("index3"), nullValue()); + + verify(mcis, times(3)).applyCreateIndexRequest(any(ClusterState.class), + any(CreateIndexClusterStateUpdateRequest.class), eq(false)); + } + + public void testAutoCreateIndexAlreadyExists() throws Exception { + ClusterState cs = ClusterState.builder(new ClusterName("_name")).metadata(Metadata.builder() + .put(IndexMetadata.builder("index2") + .settings(Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .build()) + .numberOfShards(1) + .numberOfReplicas(1) + .build(), false)).build(); + Request request = new Request(Set.of("index1", "index2", "index3"), "reason", randomBoolean() ? null : randomBoolean()); + + MetadataCreateIndexService mcis = mock(MetadataCreateIndexService.class); + expectApplyCreateIndexRequest(mcis, request, Set.of()); + + Map result = new HashMap<>(); + cs = AutoCreateAction.autoCreate(request, result, cs, mcis); + assertThat(cs.metadata().indices().size(), equalTo(3)); + assertThat(cs.metadata().index("index1"), notNullValue()); + assertThat(cs.metadata().index("index2"), notNullValue()); + assertThat(cs.metadata().index("index3"), notNullValue()); + assertThat(result.size(), equalTo(2)); + assertThat(result.containsKey("index1"), is(true)); + assertThat(result.get("index1"), nullValue()); + assertThat(result.containsKey("index2"), is(false)); + assertThat(result.get("index2"), nullValue()); + assertThat(result.containsKey("index3"), is(true)); + assertThat(result.get("index3"), nullValue()); + + verify(mcis, times(3)).applyCreateIndexRequest(any(ClusterState.class), + any(CreateIndexClusterStateUpdateRequest.class), eq(false)); + } + + public void testAutoCreateFailure() throws Exception { + ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); + Request request = new Request(Set.of("index1", "index2", "index3"), "reason", randomBoolean() ? null : randomBoolean()); + + MetadataCreateIndexService mcis = mock(MetadataCreateIndexService.class); + expectApplyCreateIndexRequest(mcis, request, Set.of("index2")); + + Map result = new HashMap<>(); + cs = AutoCreateAction.autoCreate(request, result, cs, mcis); + assertThat(cs.metadata().indices().size(), equalTo(2)); + assertThat(cs.metadata().index("index1"), notNullValue()); + assertThat(cs.metadata().index("index2"), nullValue()); + assertThat(cs.metadata().index("index3"), notNullValue()); + assertThat(result.size(), equalTo(3)); + assertThat(result.containsKey("index1"), is(true)); + assertThat(result.get("index1"), nullValue()); + assertThat(result.containsKey("index2"), is(true)); + assertThat(result.get("index2"), notNullValue()); + assertThat(result.get("index2").getMessage(), equalTo("fail!")); + assertThat(result.containsKey("index3"), is(true)); + assertThat(result.get("index3"), nullValue()); + + verify(mcis, times(3)).applyCreateIndexRequest(any(ClusterState.class), + any(CreateIndexClusterStateUpdateRequest.class), eq(false)); + } + + private void expectApplyCreateIndexRequest(MetadataCreateIndexService mcis, + Request request, + Set indicesToFail) throws Exception { + when(mcis.applyCreateIndexRequest(any(ClusterState.class), any(CreateIndexClusterStateUpdateRequest.class), eq(false))) + .thenAnswer(mockInvocation -> { + ClusterState currentState = (ClusterState) mockInvocation.getArguments()[0]; + CreateIndexClusterStateUpdateRequest updateRequest = + (CreateIndexClusterStateUpdateRequest) mockInvocation.getArguments()[1]; + assertThat(updateRequest.index(), in(request.getNames())); + assertThat(updateRequest.getProvidedName(), in(request.getNames())); + assertThat(updateRequest.cause(), equalTo(request.getCause())); + assertThat(updateRequest.masterNodeTimeout(), equalTo(request.masterNodeTimeout())); + assertThat(updateRequest.preferV2Templates(), equalTo(request.getPreferV2Templates())); + + if (currentState.metadata().hasIndex(updateRequest.index())) { + throw new ResourceAlreadyExistsException(currentState.metadata().index(updateRequest.index()).getIndex()); + } + + if (indicesToFail.contains(updateRequest.index())) { + throw new Exception("fail!"); + } + + Metadata.Builder b = Metadata.builder(currentState.metadata()) + .put(IndexMetadata.builder(updateRequest.index()) + .settings(Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(updateRequest.settings()) + .build()) + .numberOfShards(1) + .numberOfReplicas(1) + .build(), false); + return ClusterState.builder(currentState).metadata(b.build()).build(); + }); + } + +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateRequestTests.java new file mode 100644 index 0000000000000..e235e0a98ae3a --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateRequestTests.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.create; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +public class AutoCreateRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return AutoCreateAction.Request::new; + } + + @Override + protected AutoCreateAction.Request createTestInstance() { + return new AutoCreateAction.Request(randomUnique(() -> randomAlphaOfLength(4), 4), randomAlphaOfLength(4), + randomBoolean()); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateResponseTests.java new file mode 100644 index 0000000000000..1135a86044bb2 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateResponseTests.java @@ -0,0 +1,44 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.create; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.util.NavigableMap; +import java.util.TreeMap; + +public class AutoCreateResponseTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return AutoCreateAction.Response::new; + } + + @Override + protected AutoCreateAction.Response createTestInstance() { + int numEntries = randomIntBetween(0, 16); + NavigableMap map = new TreeMap<>(); + for (int i = 0; i < numEntries; i++) { + map.put(randomAlphaOfLength(4), new ElasticsearchException(randomAlphaOfLength(4))); + } + return new AutoCreateAction.Response(map); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index 096b5fca490c7..5d0448a9279a1 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.AutoCreateAction; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; @@ -38,6 +39,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -111,7 +113,7 @@ private void indicesThatCannotBeCreatedTestCase(Set expected, when(clusterService.state()).thenReturn(state); DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class); when(state.getNodes()).thenReturn(discoveryNodes); - when(discoveryNodes.getMinNodeVersion()).thenReturn(Version.CURRENT); + when(discoveryNodes.getMinNodeVersion()).thenReturn(VersionUtils.randomCompatibleVersion(random(), Version.CURRENT)); DiscoveryNode localNode = mock(DiscoveryNode.class); when(clusterService.localNode()).thenReturn(localNode); when(localNode.isIngestNode()).thenReturn(randomBoolean()); @@ -142,6 +144,13 @@ void createIndex(String index, Boolean preferV2Templates, // If we try to create an index just immediately assume it worked listener.onResponse(new CreateIndexResponse(true, true, index) {}); } + + @Override + void autoCreate(Set names, Boolean preferV2Templates, TimeValue timeout, + ActionListener listener) { + // If we try to create an index just immediately assume it worked + listener.onResponse(new AutoCreateAction.Response(Map.of())); + } }; action.doExecute(null, bulkRequest, null); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index abbae57a0ac9a..3212e53478040 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.create.AutoCreateAction; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; @@ -55,6 +56,7 @@ import org.elasticsearch.ingest.IngestService; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; @@ -67,6 +69,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; @@ -162,6 +165,13 @@ void createIndex(String index, Boolean preferV2Templates, indexCreated = true; listener.onResponse(null); } + + @Override + void autoCreate(Set names, Boolean preferV2Templates, TimeValue timeout, + ActionListener listener) { + indexCreated = true; + listener.onResponse(new AutoCreateAction.Response(Map.of())); + } } class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction { @@ -193,7 +203,7 @@ public void setupAction() { ImmutableOpenMap ingestNodes = ImmutableOpenMap.builder(2) .fPut("node1", remoteNode1).fPut("node2", remoteNode2).build(); when(nodes.getIngestNodes()).thenReturn(ingestNodes); - when(nodes.getMinNodeVersion()).thenReturn(Version.CURRENT); + when(nodes.getMinNodeVersion()).thenReturn(VersionUtils.randomCompatibleVersion(random(), Version.CURRENT)); ClusterState state = mock(ClusterState.class); when(state.getNodes()).thenReturn(nodes); Metadata metadata = Metadata.builder().indices(ImmutableOpenMap.builder() diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 1d25a18c883b8..a41138b67d5ce 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.AutoCreateAction; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.bulk.TransportBulkActionTookTests.Resolver; import org.elasticsearch.action.delete.DeleteRequest; @@ -33,6 +34,8 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -41,6 +44,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -50,6 +54,8 @@ import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; @@ -86,13 +92,22 @@ void createIndex(String index, Boolean preferV2Templates, indexCreated = true; listener.onResponse(null); } + + @Override + void autoCreate(Set names, Boolean preferV2Templates, TimeValue timeout, + ActionListener listener) { + indexCreated = true; + listener.onResponse(new AutoCreateAction.Response(Map.of())); + } } @Before public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool(getClass().getName()); - clusterService = createClusterService(threadPool); + DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(), + DiscoveryNodeRole.BUILT_IN_ROLES, VersionUtils.randomCompatibleVersion(random(), Version.CURRENT)); + clusterService = createClusterService(threadPool, discoveryNode); CapturingTransport capturingTransport = new CapturingTransport(); transportService = capturingTransport.createTransportService(clusterService.getSettings(), threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index 2b2c22cd9be5c..e084a7a7bdcbc 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.bulk; import org.apache.lucene.util.Constants; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; @@ -32,6 +33,8 @@ import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; @@ -40,6 +43,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -81,7 +85,9 @@ public static void afterClass() { @Before public void setUp() throws Exception { super.setUp(); - clusterService = createClusterService(threadPool); + DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(), + DiscoveryNodeRole.BUILT_IN_ROLES, VersionUtils.randomCompatibleVersion(random(), Version.CURRENT)); + clusterService = createClusterService(threadPool, discoveryNode); } @After From 48a8d977837a6976a020d72888581ea210e67efe Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 28 Apr 2020 16:02:55 +0200 Subject: [PATCH 02/10] take into account date math in index names. --- .../indices/create/AutoCreateAction.java | 6 +++-- .../indices/create/AutoCreateActionTests.java | 23 +++++++++++-------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java index 841bc685773bf..c86822430eefc 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java @@ -220,7 +220,7 @@ public void onFailure(String source, Exception e) { @Override public ClusterState execute(ClusterState currentState) throws Exception { - return autoCreate(request, result, currentState, createIndexService); + return autoCreate(request, result, currentState, createIndexService, indexNameExpressionResolver); } @Override @@ -239,8 +239,10 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) static ClusterState autoCreate(Request request, Map result, ClusterState currentState, - MetadataCreateIndexService createIndexService) { + MetadataCreateIndexService createIndexService, + IndexNameExpressionResolver resolver) { for (String indexName : request.names) { + indexName = resolver.resolveDateMathExpression(indexName); CreateIndexClusterStateUpdateRequest req = new CreateIndexClusterStateUpdateRequest(request.cause, indexName, indexName).masterNodeTimeout(request.masterNodeTimeout()) .preferV2Templates(request.preferV2Templates); diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateActionTests.java index b7982b8e8c3ab..f675697a64826 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateActionTests.java @@ -24,10 +24,12 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; +import org.junit.Before; import java.util.HashMap; import java.util.Map; @@ -47,15 +49,22 @@ public class AutoCreateActionTests extends ESTestCase { + private MetadataCreateIndexService mcis; + private IndexNameExpressionResolver resolver; + + @Before + public void setup() { + mcis = mock(MetadataCreateIndexService.class); + resolver = new IndexNameExpressionResolver(); + } + public void testAutoCreate() throws Exception { ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); Request request = new Request(Set.of("index1", "index2", "index3"), "reason", randomBoolean() ? null : randomBoolean()); - - MetadataCreateIndexService mcis = mock(MetadataCreateIndexService.class); expectApplyCreateIndexRequest(mcis, request, Set.of()); Map result = new HashMap<>(); - cs = AutoCreateAction.autoCreate(request, result, cs, mcis); + cs = AutoCreateAction.autoCreate(request, result, cs, mcis, resolver); assertThat(cs.metadata().indices().size(), equalTo(3)); assertThat(cs.metadata().index("index1"), notNullValue()); assertThat(cs.metadata().index("index2"), notNullValue()); @@ -82,12 +91,10 @@ public void testAutoCreateIndexAlreadyExists() throws Exception { .numberOfReplicas(1) .build(), false)).build(); Request request = new Request(Set.of("index1", "index2", "index3"), "reason", randomBoolean() ? null : randomBoolean()); - - MetadataCreateIndexService mcis = mock(MetadataCreateIndexService.class); expectApplyCreateIndexRequest(mcis, request, Set.of()); Map result = new HashMap<>(); - cs = AutoCreateAction.autoCreate(request, result, cs, mcis); + cs = AutoCreateAction.autoCreate(request, result, cs, mcis, resolver); assertThat(cs.metadata().indices().size(), equalTo(3)); assertThat(cs.metadata().index("index1"), notNullValue()); assertThat(cs.metadata().index("index2"), notNullValue()); @@ -107,12 +114,10 @@ public void testAutoCreateIndexAlreadyExists() throws Exception { public void testAutoCreateFailure() throws Exception { ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); Request request = new Request(Set.of("index1", "index2", "index3"), "reason", randomBoolean() ? null : randomBoolean()); - - MetadataCreateIndexService mcis = mock(MetadataCreateIndexService.class); expectApplyCreateIndexRequest(mcis, request, Set.of("index2")); Map result = new HashMap<>(); - cs = AutoCreateAction.autoCreate(request, result, cs, mcis); + cs = AutoCreateAction.autoCreate(request, result, cs, mcis, resolver); assertThat(cs.metadata().indices().size(), equalTo(2)); assertThat(cs.metadata().index("index1"), notNullValue()); assertThat(cs.metadata().index("index2"), nullValue()); From 0b241508ad0fa70e3552efb4556731aa67147875 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 28 Apr 2020 17:23:38 +0200 Subject: [PATCH 03/10] Change request class to implement `IndicesRequest` interface, similar to create index request. --- .../admin/indices/create/AutoCreateAction.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java index c86822430eefc..75c2bbaa1c0cd 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java @@ -23,7 +23,9 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.MasterNodeReadRequest; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; @@ -59,7 +61,7 @@ private AutoCreateAction() { super(NAME, Response::new); } - public static class Request extends MasterNodeReadRequest { + public static class Request extends MasterNodeReadRequest implements IndicesRequest { private final Set names; private final String cause; @@ -92,6 +94,16 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalBoolean(preferV2Templates); } + @Override + public String[] indices() { + return names.toArray(new String[0]); + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); + } + public Set getNames() { return names; } From 9fe33c94ea74c1426565c3137117dd1392468674 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 28 Apr 2020 20:38:12 +0200 Subject: [PATCH 04/10] Split index creation part of the AutoCreateAction class to AutoCreateIndexAction. This is needed, because AutoCreateAction will make the decision whether to auto create an data stream or index, and AutoCreateIndexAction really auto creates the index. Future change will add AutoCreateDataStream action. If this logic is in a single action then we can't distingues whether a data stream or index is being auto created, which is important because a user may be able to auto create a data stream, but not an index or visa versa. --- .../elasticsearch/action/ActionModule.java | 2 + .../indices/create/AutoCreateAction.java | 71 ++------- .../indices/create/AutoCreateIndexAction.java | 143 ++++++++++++++++++ ...s.java => AutoCreateIndexActionTests.java} | 8 +- .../authz/privilege/IndexPrivilege.java | 5 +- .../security/authz/AuthorizationService.java | 13 +- .../security/authz/WriteActionsTests.java | 2 +- 7 files changed, 175 insertions(+), 69 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexAction.java rename server/src/test/java/org/elasticsearch/action/admin/indices/create/{AutoCreateActionTests.java => AutoCreateIndexActionTests.java} (96%) diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 6cb23c2b58e77..6936851a1366b 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.elasticsearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction; import org.elasticsearch.action.admin.indices.create.AutoCreateAction; +import org.elasticsearch.action.admin.indices.create.AutoCreateIndexAction; import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction; import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; @@ -582,6 +583,7 @@ public void reg actions.register(RecoveryAction.INSTANCE, TransportRecoveryAction.class); actions.register(NodesReloadSecureSettingsAction.INSTANCE, TransportNodesReloadSecureSettingsAction.class); actions.register(AutoCreateAction.INSTANCE, AutoCreateAction.TransportAction.class); + actions.register(AutoCreateIndexAction.INSTANCE, AutoCreateIndexAction.TransportAction.class); //Indexed scripts actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java index 75c2bbaa1c0cd..0d2f64f81ba4e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.action.admin.indices.create; -import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; @@ -28,30 +27,30 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.MasterNodeReadRequest; import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; +/** + * Proxy action for auto creating an indexable resource. + * Currently only auto creates indices by redirecting to {@link AutoCreateIndexAction}. + */ public final class AutoCreateAction extends ActionType { public static final AutoCreateAction INSTANCE = new AutoCreateAction(); @@ -186,14 +185,13 @@ private static List getKeys(final Map map) { public static final class TransportAction extends TransportMasterNodeAction { - private final MetadataCreateIndexService createIndexService; + private final Client client; @Inject public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - MetadataCreateIndexService createIndexService) { + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Client client) { super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); - this.createIndexService = createIndexService; + this.client = client; } @Override @@ -211,35 +209,8 @@ protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) throws Exception { - // Should this be an AckedClusterStateUpdateTask and - // should we add ActiveShardsObserver.waitForActiveShards(...) here? - // (This api used from TransportBulkAction only and it currently ignores CreateIndexResponse, so - // I think there is no need to include acked and shard ackeds here) - clusterService.submitStateUpdateTask("auto create resources for [" + request.names + "]", - new ClusterStateUpdateTask(Priority.HIGH) { - - final Map result = new HashMap<>(); - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } - - @Override - public void onFailure(String source, Exception e) { - listener.onFailure(e); - } - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return autoCreate(request, result, currentState, createIndexService, indexNameExpressionResolver); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new AutoCreateAction.Response(result)); - } - }); + // For now always redirect to the auto create index action, because only indices get auto created. + client.execute(AutoCreateIndexAction.INSTANCE, request, listener); } @Override @@ -248,26 +219,4 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) } } - static ClusterState autoCreate(Request request, - Map result, - ClusterState currentState, - MetadataCreateIndexService createIndexService, - IndexNameExpressionResolver resolver) { - for (String indexName : request.names) { - indexName = resolver.resolveDateMathExpression(indexName); - CreateIndexClusterStateUpdateRequest req = new CreateIndexClusterStateUpdateRequest(request.cause, - indexName, indexName).masterNodeTimeout(request.masterNodeTimeout()) - .preferV2Templates(request.preferV2Templates); - try { - currentState = createIndexService.applyCreateIndexRequest(currentState, req, false); - result.put(indexName, null); - } catch (ResourceAlreadyExistsException e) { - // ignore resource already exists exception. - } catch (Exception e) { - result.put(indexName, e); - } - } - return currentState; - } - } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexAction.java new file mode 100644 index 0000000000000..829dc2a49d3e0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexAction.java @@ -0,0 +1,143 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.create; + +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Api that auto creates an index that originate from requests that write into an index that doesn't yet exist. + */ +public final class AutoCreateIndexAction extends ActionType { + + public static final AutoCreateIndexAction INSTANCE = new AutoCreateIndexAction(); + public static final String NAME = "indices:admin/auto_create_index"; + + private AutoCreateIndexAction() { + super(NAME, AutoCreateAction.Response::new); + } + + public static final class TransportAction extends TransportMasterNodeAction { + + private final MetadataCreateIndexService createIndexService; + + @Inject + public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + MetadataCreateIndexService createIndexService) { + super(NAME, transportService, clusterService, threadPool, actionFilters, AutoCreateAction.Request::new, indexNameExpressionResolver); + this.createIndexService = createIndexService; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AutoCreateAction.Response read(StreamInput in) throws IOException { + return new AutoCreateAction.Response(in); + } + + @Override + protected void masterOperation(Task task, + AutoCreateAction.Request request, + ClusterState state, + ActionListener listener) throws Exception { + // Should this be an AckedClusterStateUpdateTask and + // should we add ActiveShardsObserver.waitForActiveShards(...) here? + // (This api used from TransportBulkAction only and it currently ignores CreateIndexResponse, so + // I think there is no need to include acked and shard ackeds here) + clusterService.submitStateUpdateTask("auto create resources for [" + request.getNames() + "]", + new ClusterStateUpdateTask(Priority.HIGH) { + + final Map result = new HashMap<>(); + + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return autoCreate(request, result, currentState, createIndexService, indexNameExpressionResolver); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new AutoCreateAction.Response(result)); + } + }); + } + + @Override + protected ClusterBlockException checkBlock(AutoCreateAction.Request request, ClusterState state) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getNames().toArray(new String[0])); + } + } + + static ClusterState autoCreate(AutoCreateAction.Request request, + Map result, + ClusterState currentState, + MetadataCreateIndexService createIndexService, + IndexNameExpressionResolver resolver) { + for (String indexName : request.getNames()) { + indexName = resolver.resolveDateMathExpression(indexName); + CreateIndexClusterStateUpdateRequest req = new CreateIndexClusterStateUpdateRequest(request.getCause(), + indexName, indexName).masterNodeTimeout(request.masterNodeTimeout()) + .preferV2Templates(request.getPreferV2Templates()); + try { + currentState = createIndexService.applyCreateIndexRequest(currentState, req, false); + result.put(indexName, null); + } catch (ResourceAlreadyExistsException e) { + // ignore resource already exists exception. + } catch (Exception e) { + result.put(indexName, e); + } + } + return currentState; + } + +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexActionTests.java similarity index 96% rename from server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateActionTests.java rename to server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexActionTests.java index f675697a64826..1e154d789889e 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexActionTests.java @@ -47,7 +47,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class AutoCreateActionTests extends ESTestCase { +public class AutoCreateIndexActionTests extends ESTestCase { private MetadataCreateIndexService mcis; private IndexNameExpressionResolver resolver; @@ -64,7 +64,7 @@ public void testAutoCreate() throws Exception { expectApplyCreateIndexRequest(mcis, request, Set.of()); Map result = new HashMap<>(); - cs = AutoCreateAction.autoCreate(request, result, cs, mcis, resolver); + cs = AutoCreateIndexAction.autoCreate(request, result, cs, mcis, resolver); assertThat(cs.metadata().indices().size(), equalTo(3)); assertThat(cs.metadata().index("index1"), notNullValue()); assertThat(cs.metadata().index("index2"), notNullValue()); @@ -94,7 +94,7 @@ public void testAutoCreateIndexAlreadyExists() throws Exception { expectApplyCreateIndexRequest(mcis, request, Set.of()); Map result = new HashMap<>(); - cs = AutoCreateAction.autoCreate(request, result, cs, mcis, resolver); + cs = AutoCreateIndexAction.autoCreate(request, result, cs, mcis, resolver); assertThat(cs.metadata().indices().size(), equalTo(3)); assertThat(cs.metadata().index("index1"), notNullValue()); assertThat(cs.metadata().index("index2"), notNullValue()); @@ -117,7 +117,7 @@ public void testAutoCreateFailure() throws Exception { expectApplyCreateIndexRequest(mcis, request, Set.of("index2")); Map result = new HashMap<>(); - cs = AutoCreateAction.autoCreate(request, result, cs, mcis, resolver); + cs = AutoCreateIndexAction.autoCreate(request, result, cs, mcis, resolver); assertThat(cs.metadata().indices().size(), equalTo(2)); assertThat(cs.metadata().index("index1"), notNullValue()); assertThat(cs.metadata().index("index2"), nullValue()); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java index 47dad67ee1af9..d35a1e7a9f8b6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java @@ -11,6 +11,8 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction; import org.elasticsearch.action.admin.indices.close.CloseIndexAction; +import org.elasticsearch.action.admin.indices.create.AutoCreateAction; +import org.elasticsearch.action.admin.indices.create.AutoCreateIndexAction; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; import org.elasticsearch.action.admin.indices.get.GetIndexAction; @@ -57,7 +59,8 @@ public final class IndexPrivilege extends Privilege { private static final Automaton MONITOR_AUTOMATON = patterns("indices:monitor/*"); private static final Automaton MANAGE_AUTOMATON = unionAndMinimize(Arrays.asList(MONITOR_AUTOMATON, patterns("indices:admin/*"))); - private static final Automaton CREATE_INDEX_AUTOMATON = patterns(CreateIndexAction.NAME); + private static final Automaton CREATE_INDEX_AUTOMATON = patterns(CreateIndexAction.NAME, AutoCreateAction.NAME, + AutoCreateIndexAction.NAME); private static final Automaton DELETE_INDEX_AUTOMATON = patterns(DeleteIndexAction.NAME); private static final Automaton VIEW_METADATA_AUTOMATON = patterns(GetAliasesAction.NAME, GetIndexAction.NAME, GetFieldMappingsAction.NAME + "*", GetMappingsAction.NAME, diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java index e2a335614eecb..9803b7fce9c3f 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java @@ -14,12 +14,14 @@ import org.elasticsearch.action.StepListener; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction; +import org.elasticsearch.action.admin.indices.create.AutoCreateAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.bulk.BulkItemRequest; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.bulk.TransportShardBulkAction; import org.elasticsearch.action.delete.DeleteAction; import org.elasticsearch.action.index.IndexAction; +import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest; import org.elasticsearch.action.update.UpdateAction; @@ -294,8 +296,15 @@ private void handleIndexActionAuthorizationResult(final IndexAuthorizationResult } //if we are creating an index we need to authorize potential aliases created at the same time if (IndexPrivilege.CREATE_INDEX_MATCHER.test(action)) { - assert request instanceof CreateIndexRequest; - Set aliases = ((CreateIndexRequest) request).aliases(); + final Set aliases; + if (request instanceof CreateIndexRequest) { + aliases = ((CreateIndexRequest) request).aliases(); + } else if (request instanceof AutoCreateAction.Request) { + aliases = Set.of(); + } else { + aliases = Set.of(); + assert false; + } if (aliases.isEmpty()) { runRequestInterceptors(requestInfo, authzInfo, authorizationEngine, listener); } else { diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/WriteActionsTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/WriteActionsTests.java index b833e71deca3f..94f5c791be5c4 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/WriteActionsTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/WriteActionsTests.java @@ -36,7 +36,7 @@ protected String configRoles() { " cluster: [ ALL ]\n" + " indices:\n" + " - names: 'missing'\n" + - " privileges: [ 'indices:admin/create', 'indices:admin/delete' ]\n" + + " privileges: [ 'indices:admin/create', 'indices:admin/auto_create', 'indices:admin/auto_create_index', 'indices:admin/delete' ]\n" + " - names: ['/index.*/']\n" + " privileges: [ manage ]\n" + " - names: ['/test.*/']\n" + From 8431d37c553d817275659fe00ea3b62b0d03fadf Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 28 Apr 2020 21:43:30 +0200 Subject: [PATCH 05/10] fixed checkstyle violations --- .../action/admin/indices/create/AutoCreateIndexAction.java | 3 ++- .../xpack/security/authz/AuthorizationService.java | 1 - .../elasticsearch/xpack/security/authz/WriteActionsTests.java | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexAction.java index 829dc2a49d3e0..756f3d4694d2c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexAction.java @@ -62,7 +62,8 @@ public static final class TransportAction extends TransportMasterNodeAction Date: Wed, 29 Apr 2020 11:51:54 +0200 Subject: [PATCH 06/10] iter --- .../indices/create/AutoCreateAction.java | 54 ++++-- .../indices/create/AutoCreateIndexAction.java | 78 ++------ .../create/TransportCreateIndexAction.java | 14 +- .../action/bulk/TransportBulkAction.java | 2 +- .../indices/create/AutoCreateActionTests.java | 111 +++++++++++ .../create/AutoCreateIndexActionTests.java | 173 ------------------ .../xpack/security/authz/RBACEngine.java | 2 + 7 files changed, 173 insertions(+), 261 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateActionTests.java delete mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexActionTests.java diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java index 0d2f64f81ba4e..38835bfb9ebba 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java @@ -22,9 +22,8 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.CompositeIndicesRequest; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.MasterNodeReadRequest; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; @@ -41,10 +40,12 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** @@ -60,7 +61,7 @@ private AutoCreateAction() { super(NAME, Response::new); } - public static class Request extends MasterNodeReadRequest implements IndicesRequest { + public static class Request extends MasterNodeReadRequest implements CompositeIndicesRequest { private final Set names; private final String cause; @@ -93,16 +94,6 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalBoolean(preferV2Templates); } - @Override - public String[] indices() { - return names.toArray(new String[0]); - } - - @Override - public IndicesOptions indicesOptions() { - return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); - } - public Set getNames() { return names; } @@ -208,9 +199,8 @@ protected Response read(StreamInput in) throws IOException { protected void masterOperation(Task task, Request request, ClusterState state, - ActionListener listener) throws Exception { - // For now always redirect to the auto create index action, because only indices get auto created. - client.execute(AutoCreateIndexAction.INSTANCE, request, listener); + ActionListener listener) { + autoCreate(request, listener, client); } @Override @@ -219,4 +209,36 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) } } + static void autoCreate(Request request, ActionListener listener, Client client) { + // For now always redirect to the auto create index action, because only indices get auto created. + final AtomicInteger counter = new AtomicInteger(request.getNames().size()); + final Map results = new HashMap<>(); + for (String name : request.getNames()) { + CreateIndexRequest createIndexRequest = new CreateIndexRequest(); + createIndexRequest.index(name); + createIndexRequest.cause(request.getCause()); + createIndexRequest.masterNodeTimeout(request.masterNodeTimeout()); + createIndexRequest.preferV2Templates(request.getPreferV2Templates()); + client.execute(AutoCreateIndexAction.INSTANCE, createIndexRequest, ActionListener.wrap( + createIndexResponse -> { + // Maybe a bit overkill to ensure visibility of results map across threads... + synchronized (results) { + results.put(name, null); + if (counter.decrementAndGet() == 0) { + listener.onResponse(new Response(results)); + } + } + }, + e -> { + synchronized (results) { + results.put(name, e); + if (counter.decrementAndGet() == 0) { + listener.onResponse(new Response(results)); + } + } + } + )); + } + } + } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexAction.java index 756f3d4694d2c..e709124dcf063 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexAction.java @@ -18,43 +18,37 @@ */ package org.elasticsearch.action.admin.indices.create; -import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; /** * Api that auto creates an index that originate from requests that write into an index that doesn't yet exist. */ -public final class AutoCreateIndexAction extends ActionType { +public final class AutoCreateIndexAction extends ActionType { public static final AutoCreateIndexAction INSTANCE = new AutoCreateIndexAction(); public static final String NAME = "indices:admin/auto_create_index"; private AutoCreateIndexAction() { - super(NAME, AutoCreateAction.Response::new); + super(NAME, CreateIndexResponse::new); } - public static final class TransportAction extends TransportMasterNodeAction { + public static final class TransportAction extends TransportMasterNodeAction { private final MetadataCreateIndexService createIndexService; @@ -62,7 +56,7 @@ public static final class TransportAction extends TransportMasterNodeAction listener) throws Exception { - // Should this be an AckedClusterStateUpdateTask and - // should we add ActiveShardsObserver.waitForActiveShards(...) here? - // (This api used from TransportBulkAction only and it currently ignores CreateIndexResponse, so - // I think there is no need to include acked and shard ackeds here) - clusterService.submitStateUpdateTask("auto create resources for [" + request.getNames() + "]", - new ClusterStateUpdateTask(Priority.HIGH) { - - final Map result = new HashMap<>(); - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } - - @Override - public void onFailure(String source, Exception e) { - listener.onFailure(e); - } - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return autoCreate(request, result, currentState, createIndexService, indexNameExpressionResolver); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new AutoCreateAction.Response(result)); - } - }); + ActionListener listener) throws Exception { + TransportCreateIndexAction.innerCreateIndex(request, listener, indexNameExpressionResolver, createIndexService); } @Override - protected ClusterBlockException checkBlock(AutoCreateAction.Request request, ClusterState state) { - return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getNames().toArray(new String[0])); - } - } - - static ClusterState autoCreate(AutoCreateAction.Request request, - Map result, - ClusterState currentState, - MetadataCreateIndexService createIndexService, - IndexNameExpressionResolver resolver) { - for (String indexName : request.getNames()) { - indexName = resolver.resolveDateMathExpression(indexName); - CreateIndexClusterStateUpdateRequest req = new CreateIndexClusterStateUpdateRequest(request.getCause(), - indexName, indexName).masterNodeTimeout(request.masterNodeTimeout()) - .preferV2Templates(request.getPreferV2Templates()); - try { - currentState = createIndexService.applyCreateIndexRequest(currentState, req, false); - result.put(indexName, null); - } catch (ResourceAlreadyExistsException e) { - // ignore resource already exists exception. - } catch (Exception e) { - result.put(indexName, e); - } + protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterState state) { + return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.index()); } - return currentState; } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java index bfa99c768ffa5..a2089f68425cc 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java @@ -71,14 +71,20 @@ protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterSt @Override protected void masterOperation(Task task, final CreateIndexRequest request, final ClusterState state, final ActionListener listener) { - String cause = request.cause(); - if (cause.length() == 0) { - cause = "api"; + if (request.cause().length() == 0) { + request.cause("api"); } + innerCreateIndex(request, listener, indexNameExpressionResolver, createIndexService); + } + + static void innerCreateIndex(CreateIndexRequest request, + ActionListener listener, + IndexNameExpressionResolver indexNameExpressionResolver, + MetadataCreateIndexService createIndexService) { final String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index()); final CreateIndexClusterStateUpdateRequest updateRequest = - new CreateIndexClusterStateUpdateRequest(cause, indexName, request.index()) + new CreateIndexClusterStateUpdateRequest(request.cause(), indexName, request.index()) .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) .settings(request.settings()).mappings(request.mappings()) .aliases(request.aliases()) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index d1aae56305ab4..6d5180e9e7ed2 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -243,7 +243,7 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener entry : response.getFailureByNames().entrySet()) { Exception e = entry.getValue(); - if (e == null) { + if (e == null || ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) { continue; } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateActionTests.java new file mode 100644 index 0000000000000..a6de848628e82 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateActionTests.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.create; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.test.ESTestCase; +import org.mockito.Mockito; + +import java.util.Set; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.in; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class AutoCreateActionTests extends ESTestCase { + + public void testAutoCreate() { + AutoCreateAction.Request request = + new AutoCreateAction.Request(Set.of("index1", "index2", "index3"), "reason", randomBoolean() ? null : randomBoolean()); + Client client = mock(Client.class); + Mockito.doAnswer(invocationOnMock -> { + CreateIndexRequest createIndexRequest = (CreateIndexRequest) invocationOnMock.getArguments()[1]; + assertThat(createIndexRequest.index(), in(request.getNames())); + assertThat(createIndexRequest.cause(), equalTo(request.getCause())); + assertThat(createIndexRequest.preferV2Templates(), equalTo((request.getPreferV2Templates()))); + + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(new CreateIndexResponse(true, true, createIndexRequest.index())); + return invocationOnMock; + }).when(client).execute(same(AutoCreateIndexAction.INSTANCE), any(), any()); + + AutoCreateAction.Response[] holder = new AutoCreateAction.Response[1]; + AutoCreateAction.autoCreate(request, + ActionListener.wrap(r -> holder[0] = r, e -> {throw new RuntimeException(e);}), client); + AutoCreateAction.Response result = holder[0]; + + assertThat(result.getFailureByNames().size(), equalTo(3)); + assertThat(result.getFailureByNames().containsKey("index1"), is(true)); + assertThat(result.getFailureByNames().get("index1"), nullValue()); + assertThat(result.getFailureByNames().containsKey("index2"), is(true)); + assertThat(result.getFailureByNames().get("index2"), nullValue()); + assertThat(result.getFailureByNames().containsKey("index3"), is(true)); + assertThat(result.getFailureByNames().get("index3"), nullValue()); + + verify(client, times(3)).execute(same(AutoCreateIndexAction.INSTANCE), any(), any()); + } + + public void testAutoCreateFailure() { + AutoCreateAction.Request request = + new AutoCreateAction.Request(Set.of("index1", "index2", "index3"), "reason", randomBoolean() ? null : randomBoolean()); + Client client = mock(Client.class); + Mockito.doAnswer(invocationOnMock -> { + CreateIndexRequest createIndexRequest = (CreateIndexRequest) invocationOnMock.getArguments()[1]; + assertThat(createIndexRequest.index(), in(request.getNames())); + assertThat(createIndexRequest.cause(), equalTo(request.getCause())); + assertThat(createIndexRequest.preferV2Templates(), equalTo((request.getPreferV2Templates()))); + + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + if ("index2".equals(createIndexRequest.index())) { + listener.onFailure(new Exception("fail!")); + } else { + listener.onResponse(new CreateIndexResponse(true, true, createIndexRequest.index())); + } + + return invocationOnMock; + }).when(client).execute(same(AutoCreateIndexAction.INSTANCE), any(), any()); + + AutoCreateAction.Response[] holder = new AutoCreateAction.Response[1]; + AutoCreateAction.autoCreate(request, + ActionListener.wrap(r -> holder[0] = r, e -> {throw new RuntimeException(e);}), client); + AutoCreateAction.Response result = holder[0]; + + assertThat(result.getFailureByNames().size(), equalTo(3)); + assertThat(result.getFailureByNames().containsKey("index1"), is(true)); + assertThat(result.getFailureByNames().get("index1"), nullValue()); + assertThat(result.getFailureByNames().containsKey("index2"), is(true)); + assertThat(result.getFailureByNames().get("index2"), notNullValue()); + assertThat(result.getFailureByNames().get("index2").getMessage(), equalTo("fail!")); + assertThat(result.getFailureByNames().containsKey("index3"), is(true)); + assertThat(result.getFailureByNames().get("index3"), nullValue()); + + verify(client, times(3)).execute(same(AutoCreateIndexAction.INSTANCE), any(), any()); + } + +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexActionTests.java deleted file mode 100644 index 1e154d789889e..0000000000000 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexActionTests.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.indices.create; - -import org.elasticsearch.ResourceAlreadyExistsException; -import org.elasticsearch.Version; -import org.elasticsearch.action.admin.indices.create.AutoCreateAction.Request; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.test.ESTestCase; -import org.junit.Before; - -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.in; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class AutoCreateIndexActionTests extends ESTestCase { - - private MetadataCreateIndexService mcis; - private IndexNameExpressionResolver resolver; - - @Before - public void setup() { - mcis = mock(MetadataCreateIndexService.class); - resolver = new IndexNameExpressionResolver(); - } - - public void testAutoCreate() throws Exception { - ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); - Request request = new Request(Set.of("index1", "index2", "index3"), "reason", randomBoolean() ? null : randomBoolean()); - expectApplyCreateIndexRequest(mcis, request, Set.of()); - - Map result = new HashMap<>(); - cs = AutoCreateIndexAction.autoCreate(request, result, cs, mcis, resolver); - assertThat(cs.metadata().indices().size(), equalTo(3)); - assertThat(cs.metadata().index("index1"), notNullValue()); - assertThat(cs.metadata().index("index2"), notNullValue()); - assertThat(cs.metadata().index("index3"), notNullValue()); - assertThat(result.size(), equalTo(3)); - assertThat(result.containsKey("index1"), is(true)); - assertThat(result.get("index1"), nullValue()); - assertThat(result.containsKey("index2"), is(true)); - assertThat(result.get("index2"), nullValue()); - assertThat(result.containsKey("index3"), is(true)); - assertThat(result.get("index3"), nullValue()); - - verify(mcis, times(3)).applyCreateIndexRequest(any(ClusterState.class), - any(CreateIndexClusterStateUpdateRequest.class), eq(false)); - } - - public void testAutoCreateIndexAlreadyExists() throws Exception { - ClusterState cs = ClusterState.builder(new ClusterName("_name")).metadata(Metadata.builder() - .put(IndexMetadata.builder("index2") - .settings(Settings.builder() - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) - .build()) - .numberOfShards(1) - .numberOfReplicas(1) - .build(), false)).build(); - Request request = new Request(Set.of("index1", "index2", "index3"), "reason", randomBoolean() ? null : randomBoolean()); - expectApplyCreateIndexRequest(mcis, request, Set.of()); - - Map result = new HashMap<>(); - cs = AutoCreateIndexAction.autoCreate(request, result, cs, mcis, resolver); - assertThat(cs.metadata().indices().size(), equalTo(3)); - assertThat(cs.metadata().index("index1"), notNullValue()); - assertThat(cs.metadata().index("index2"), notNullValue()); - assertThat(cs.metadata().index("index3"), notNullValue()); - assertThat(result.size(), equalTo(2)); - assertThat(result.containsKey("index1"), is(true)); - assertThat(result.get("index1"), nullValue()); - assertThat(result.containsKey("index2"), is(false)); - assertThat(result.get("index2"), nullValue()); - assertThat(result.containsKey("index3"), is(true)); - assertThat(result.get("index3"), nullValue()); - - verify(mcis, times(3)).applyCreateIndexRequest(any(ClusterState.class), - any(CreateIndexClusterStateUpdateRequest.class), eq(false)); - } - - public void testAutoCreateFailure() throws Exception { - ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); - Request request = new Request(Set.of("index1", "index2", "index3"), "reason", randomBoolean() ? null : randomBoolean()); - expectApplyCreateIndexRequest(mcis, request, Set.of("index2")); - - Map result = new HashMap<>(); - cs = AutoCreateIndexAction.autoCreate(request, result, cs, mcis, resolver); - assertThat(cs.metadata().indices().size(), equalTo(2)); - assertThat(cs.metadata().index("index1"), notNullValue()); - assertThat(cs.metadata().index("index2"), nullValue()); - assertThat(cs.metadata().index("index3"), notNullValue()); - assertThat(result.size(), equalTo(3)); - assertThat(result.containsKey("index1"), is(true)); - assertThat(result.get("index1"), nullValue()); - assertThat(result.containsKey("index2"), is(true)); - assertThat(result.get("index2"), notNullValue()); - assertThat(result.get("index2").getMessage(), equalTo("fail!")); - assertThat(result.containsKey("index3"), is(true)); - assertThat(result.get("index3"), nullValue()); - - verify(mcis, times(3)).applyCreateIndexRequest(any(ClusterState.class), - any(CreateIndexClusterStateUpdateRequest.class), eq(false)); - } - - private void expectApplyCreateIndexRequest(MetadataCreateIndexService mcis, - Request request, - Set indicesToFail) throws Exception { - when(mcis.applyCreateIndexRequest(any(ClusterState.class), any(CreateIndexClusterStateUpdateRequest.class), eq(false))) - .thenAnswer(mockInvocation -> { - ClusterState currentState = (ClusterState) mockInvocation.getArguments()[0]; - CreateIndexClusterStateUpdateRequest updateRequest = - (CreateIndexClusterStateUpdateRequest) mockInvocation.getArguments()[1]; - assertThat(updateRequest.index(), in(request.getNames())); - assertThat(updateRequest.getProvidedName(), in(request.getNames())); - assertThat(updateRequest.cause(), equalTo(request.getCause())); - assertThat(updateRequest.masterNodeTimeout(), equalTo(request.masterNodeTimeout())); - assertThat(updateRequest.preferV2Templates(), equalTo(request.getPreferV2Templates())); - - if (currentState.metadata().hasIndex(updateRequest.index())) { - throw new ResourceAlreadyExistsException(currentState.metadata().index(updateRequest.index()).getIndex()); - } - - if (indicesToFail.contains(updateRequest.index())) { - throw new Exception("fail!"); - } - - Metadata.Builder b = Metadata.builder(currentState.metadata()) - .put(IndexMetadata.builder(updateRequest.index()) - .settings(Settings.builder() - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) - .put(updateRequest.settings()) - .build()) - .numberOfShards(1) - .numberOfReplicas(1) - .build(), false); - return ClusterState.builder(currentState).metadata(b.build()).build(); - }); - } - -} diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java index 81b35e4dfb163..3536b42b94759 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.CompositeIndicesRequest; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.action.admin.indices.create.AutoCreateAction; import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.delete.DeleteAction; @@ -200,6 +201,7 @@ boolean checkSameUserPermissions(String action, TransportRequest request, Authen private static boolean shouldAuthorizeIndexActionNameOnly(String action, TransportRequest request) { switch (action) { + case AutoCreateAction.NAME: case BulkAction.NAME: case IndexAction.NAME: case DeleteAction.NAME: From 070df7114f8c7ac3687f04cdba03ed15d78b036e Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 29 Apr 2020 13:27:36 +0200 Subject: [PATCH 07/10] fixed test --- x-pack/plugin/ml/qa/ml-with-security/roles.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/qa/ml-with-security/roles.yml b/x-pack/plugin/ml/qa/ml-with-security/roles.yml index 48c4abb9f4262..395d7c23b93af 100644 --- a/x-pack/plugin/ml/qa/ml-with-security/roles.yml +++ b/x-pack/plugin/ml/qa/ml-with-security/roles.yml @@ -9,7 +9,7 @@ minimal: # non-ML indices - names: [ 'airline-data', 'index-*', 'unavailable-data', 'utopia' ] privileges: - - indices:admin/create + - create_index - indices:admin/refresh - read - index From 7095f79fe245900b3669df7f16f8100f465b2d5d Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Sun, 3 May 2020 22:28:48 +0200 Subject: [PATCH 08/10] Simplify auto create index logic: * One action for auto creating indices and in the future also auto create data streams * The TransportBulkAction just executes the auto create action like the create index action. * Avoid the complexity of auto creating indices in a single request / cluster state update. This adds quite some complexity while the benefits are likely only noticeable in edge cases (if many indices get auto created)) Also on the security side, authorization of the auto create indices would become much more complex compared to authorization of auto creating an index one at a time. --- .../elasticsearch/action/ActionModule.java | 2 - .../indices/create/AutoCreateAction.java | 191 ++---------------- .../action/bulk/TransportBulkAction.java | 124 ++++-------- .../indices/create/AutoCreateActionTests.java | 111 ---------- .../create/AutoCreateRequestTests.java | 36 ---- .../create/AutoCreateResponseTests.java | 44 ---- ...ActionIndicesThatCannotBeCreatedTests.java | 10 +- .../bulk/TransportBulkActionIngestTests.java | 12 +- .../action/bulk/TransportBulkActionTests.java | 12 +- .../authz/privilege/IndexPrivilege.java | 4 +- .../security/authz/AuthorizationService.java | 11 +- .../xpack/security/authz/RBACEngine.java | 1 - .../security/authz/WriteActionsTests.java | 2 +- 13 files changed, 59 insertions(+), 501 deletions(-) delete mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateActionTests.java delete mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateRequestTests.java delete mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateResponseTests.java diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index c88405e7a6f6d..e5f5c74bb0a80 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -29,7 +29,6 @@ import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.elasticsearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction; import org.elasticsearch.action.admin.indices.create.AutoCreateAction; -import org.elasticsearch.action.admin.indices.create.AutoCreateIndexAction; import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction; import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; @@ -590,7 +589,6 @@ public void reg actions.register(RecoveryAction.INSTANCE, TransportRecoveryAction.class); actions.register(NodesReloadSecureSettingsAction.INSTANCE, TransportNodesReloadSecureSettingsAction.class); actions.register(AutoCreateAction.INSTANCE, AutoCreateAction.TransportAction.class); - actions.register(AutoCreateIndexAction.INSTANCE, AutoCreateIndexAction.TransportAction.class); //Indexed scripts actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java index 38835bfb9ebba..6fff3b3ba1c79 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java @@ -19,170 +19,45 @@ package org.elasticsearch.action.admin.indices.create; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.CompositeIndicesRequest; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.MasterNodeReadRequest; import org.elasticsearch.action.support.master.TransportMasterNodeAction; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; /** - * Proxy action for auto creating an indexable resource. - * Currently only auto creates indices by redirecting to {@link AutoCreateIndexAction}. + * Api that auto creates an index that originate from requests that write into an index that doesn't yet exist. */ -public final class AutoCreateAction extends ActionType { +public final class AutoCreateAction extends ActionType { public static final AutoCreateAction INSTANCE = new AutoCreateAction(); public static final String NAME = "indices:admin/auto_create"; private AutoCreateAction() { - super(NAME, Response::new); + super(NAME, CreateIndexResponse::new); } - public static class Request extends MasterNodeReadRequest implements CompositeIndicesRequest { + public static final class TransportAction extends TransportMasterNodeAction { - private final Set names; - private final String cause; - private final Boolean preferV2Templates; - - public Request(Set names, String cause, Boolean preferV2Templates) { - this.names = Objects.requireNonNull(names); - this.cause = Objects.requireNonNull(cause); - this.preferV2Templates = preferV2Templates; - assert names.size() != 0; - } - - public Request(StreamInput in) throws IOException { - super(in); - this.names = in.readSet(StreamInput::readString); - this.cause = in.readString(); - this.preferV2Templates = in.readOptionalBoolean(); - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeStringCollection(names); - out.writeString(cause); - out.writeOptionalBoolean(preferV2Templates); - } - - public Set getNames() { - return names; - } - - public String getCause() { - return cause; - } - - public Boolean getPreferV2Templates() { - return preferV2Templates; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Request request = (Request) o; - return names.equals(request.names) && - Objects.equals(preferV2Templates, request.preferV2Templates); - } - - @Override - public int hashCode() { - return Objects.hash(names, preferV2Templates); - } - } - - public static class Response extends ActionResponse { - - private final Map failureByNames; - - public Response(Map failureByNames) { - this.failureByNames = failureByNames; - } - - public Response(StreamInput in) throws IOException { - super(in); - failureByNames = in.readMap(StreamInput::readString, StreamInput::readException); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeMap(failureByNames, StreamOutput::writeString, StreamOutput::writeException); - } - - public Map getFailureByNames() { - return failureByNames; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Response response = (Response) o; - /* - * Exception does not implement equals(...) so we will compute the hash code based on the key set and the - * messages. - */ - return Objects.equals(getKeys(failureByNames), getKeys(response.failureByNames)) && - Objects.equals(getExceptionMessages(failureByNames), getExceptionMessages(response.failureByNames)); - } - - @Override - public int hashCode() { - /* - * Exception does not implement hash code so we will compute the hash code based on the key set and the - * messages. - */ - return Objects.hash(getKeys(failureByNames), getExceptionMessages(failureByNames)); - } - - private static List getExceptionMessages(final Map map) { - return map.values().stream().map(Throwable::getMessage).sorted(String::compareTo).collect(Collectors.toList()); - } - - private static List getKeys(final Map map) { - return map.keySet().stream().sorted(String::compareTo).collect(Collectors.toList()); - } - } - - public static final class TransportAction extends TransportMasterNodeAction { - - private final Client client; + private final MetadataCreateIndexService createIndexService; @Inject public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Client client) { - super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); - this.client = client; + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + MetadataCreateIndexService createIndexService) { + super(NAME, transportService, clusterService, threadPool, actionFilters, CreateIndexRequest::new, indexNameExpressionResolver); + this.createIndexService = createIndexService; } @Override @@ -191,53 +66,21 @@ protected String executor() { } @Override - protected Response read(StreamInput in) throws IOException { - return new Response(in); + protected CreateIndexResponse read(StreamInput in) throws IOException { + return new CreateIndexResponse(in); } @Override protected void masterOperation(Task task, - Request request, + CreateIndexRequest request, ClusterState state, - ActionListener listener) { - autoCreate(request, listener, client); + ActionListener listener) { + TransportCreateIndexAction.innerCreateIndex(request, listener, indexNameExpressionResolver, createIndexService); } @Override - protected ClusterBlockException checkBlock(Request request, ClusterState state) { - return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, request.names.toArray(new String[0])); - } - } - - static void autoCreate(Request request, ActionListener listener, Client client) { - // For now always redirect to the auto create index action, because only indices get auto created. - final AtomicInteger counter = new AtomicInteger(request.getNames().size()); - final Map results = new HashMap<>(); - for (String name : request.getNames()) { - CreateIndexRequest createIndexRequest = new CreateIndexRequest(); - createIndexRequest.index(name); - createIndexRequest.cause(request.getCause()); - createIndexRequest.masterNodeTimeout(request.masterNodeTimeout()); - createIndexRequest.preferV2Templates(request.getPreferV2Templates()); - client.execute(AutoCreateIndexAction.INSTANCE, createIndexRequest, ActionListener.wrap( - createIndexResponse -> { - // Maybe a bit overkill to ensure visibility of results map across threads... - synchronized (results) { - results.put(name, null); - if (counter.decrementAndGet() == 0) { - listener.onResponse(new Response(results)); - } - } - }, - e -> { - synchronized (results) { - results.put(name, e); - if (counter.decrementAndGet() == 0) { - listener.onResponse(new Response(results)); - } - } - } - )); + protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterState state) { + return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.index()); } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 6d5180e9e7ed2..dea883ca62a32 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -57,7 +57,6 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -87,7 +86,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; -import java.util.function.Consumer; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -238,91 +236,36 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener handler = response -> { - Exception suppressed = null; - for (Map.Entry entry : response.getFailureByNames().entrySet()) { - Exception e = entry.getValue(); - if (e == null || ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) { - continue; + final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size()); + for (String index : autoCreateIndices) { + createIndex(index, bulkRequest.preferV2Templates(), bulkRequest.timeout(), minNodeVersion, new ActionListener<>() { + @Override + public void onResponse(CreateIndexResponse result) { + if (counter.decrementAndGet() == 0) { + threadPool.executor(ThreadPool.Names.WRITE).execute( + () -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated)); } - - String index = entry.getKey(); - for (int i = 0; i < bulkRequest.requests.size(); i++) { - DocWriteRequest request = bulkRequest.requests.get(i); - if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) { - bulkRequest.requests.set(i, null); - } - } - - if (suppressed == null) { - suppressed = e; - } else { - suppressed.addSuppressed(e); - } - } - - final ActionListener finalListener; - if (suppressed != null) { - final Exception e = suppressed; - finalListener = ActionListener.wrap(listener::onResponse, inner -> { - inner.addSuppressed(e); - listener.onFailure(inner); - }); - } else { - finalListener = listener; - } - - threadPool.executor(ThreadPool.Names.WRITE).execute( - () -> executeBulk(task, bulkRequest, startTime, finalListener, responses, indicesThatCannotBeCreated)); - }; - Consumer failureHandler = e -> { - // major failure, set error on all bulk items and then return bulk response - for (int i = 0; i < bulkRequest.requests.size(); i++) { - DocWriteRequest request = bulkRequest.requests.get(i); - BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.id(), e); - responses.set(i, new BulkItemResponse(i, request.opType(), failure)); - bulkRequest.requests.set(i, null); } - executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> { - inner.addSuppressed(e); - listener.onFailure(inner); - }), responses, indicesThatCannotBeCreated); - }; - autoCreate(autoCreateIndices, bulkRequest.preferV2Templates(), bulkRequest.timeout(), - ActionListener.wrap(handler, failureHandler)); - } else { - final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size()); - for (String index : autoCreateIndices) { - createIndex(index, bulkRequest.preferV2Templates(), bulkRequest.timeout(), new ActionListener<>() { - @Override - public void onResponse(CreateIndexResponse result) { - if (counter.decrementAndGet() == 0) { - threadPool.executor(ThreadPool.Names.WRITE).execute( - () -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated)); - } - } - @Override - public void onFailure(Exception e) { - if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) { - // fail all requests involving this index, if create didn't work - for (int i = 0; i < bulkRequest.requests.size(); i++) { - DocWriteRequest request = bulkRequest.requests.get(i); - if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) { - bulkRequest.requests.set(i, null); - } + @Override + public void onFailure(Exception e) { + if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) { + // fail all requests involving this index, if create didn't work + for (int i = 0; i < bulkRequest.requests.size(); i++) { + DocWriteRequest request = bulkRequest.requests.get(i); + if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) { + bulkRequest.requests.set(i, null); } } - if (counter.decrementAndGet() == 0) { - executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> { - inner.addSuppressed(e); - listener.onFailure(inner); - }), responses, indicesThatCannotBeCreated); - } } - }); - } + if (counter.decrementAndGet() == 0) { + executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> { + inner.addSuppressed(e); + listener.onFailure(inner); + }), responses, indicesThatCannotBeCreated); + } + } + }); } } } else { @@ -439,20 +382,21 @@ boolean shouldAutoCreate(String index, ClusterState state) { return autoCreateIndex.shouldAutoCreate(index, state); } - void createIndex(String index, Boolean preferV2Templates, TimeValue timeout, ActionListener listener) { + void createIndex(String index, + Boolean preferV2Templates, + TimeValue timeout, + Version minNodeVersion, + ActionListener listener) { CreateIndexRequest createIndexRequest = new CreateIndexRequest(); createIndexRequest.index(index); createIndexRequest.cause("auto(bulk api)"); createIndexRequest.masterNodeTimeout(timeout); createIndexRequest.preferV2Templates(preferV2Templates); - client.admin().indices().create(createIndexRequest, listener); - } - - void autoCreate(Set names, Boolean preferV2Templates, TimeValue timeout, - ActionListener listener) { - AutoCreateAction.Request request = new AutoCreateAction.Request(names, "auto(bulk api)", preferV2Templates); - request.masterNodeTimeout(timeout); - client.execute(AutoCreateAction.INSTANCE, request, listener); + if (minNodeVersion.onOrAfter(Version.V_8_0_0)) { + client.execute(AutoCreateAction.INSTANCE, createIndexRequest, listener); + } else { + client.admin().indices().create(createIndexRequest, listener); + } } private boolean setResponseFailureIfIndexMatches(AtomicArray responses, int idx, DocWriteRequest request, diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateActionTests.java deleted file mode 100644 index a6de848628e82..0000000000000 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateActionTests.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.indices.create; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.client.Client; -import org.elasticsearch.test.ESTestCase; -import org.mockito.Mockito; - -import java.util.Set; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.in; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.same; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -public class AutoCreateActionTests extends ESTestCase { - - public void testAutoCreate() { - AutoCreateAction.Request request = - new AutoCreateAction.Request(Set.of("index1", "index2", "index3"), "reason", randomBoolean() ? null : randomBoolean()); - Client client = mock(Client.class); - Mockito.doAnswer(invocationOnMock -> { - CreateIndexRequest createIndexRequest = (CreateIndexRequest) invocationOnMock.getArguments()[1]; - assertThat(createIndexRequest.index(), in(request.getNames())); - assertThat(createIndexRequest.cause(), equalTo(request.getCause())); - assertThat(createIndexRequest.preferV2Templates(), equalTo((request.getPreferV2Templates()))); - - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; - listener.onResponse(new CreateIndexResponse(true, true, createIndexRequest.index())); - return invocationOnMock; - }).when(client).execute(same(AutoCreateIndexAction.INSTANCE), any(), any()); - - AutoCreateAction.Response[] holder = new AutoCreateAction.Response[1]; - AutoCreateAction.autoCreate(request, - ActionListener.wrap(r -> holder[0] = r, e -> {throw new RuntimeException(e);}), client); - AutoCreateAction.Response result = holder[0]; - - assertThat(result.getFailureByNames().size(), equalTo(3)); - assertThat(result.getFailureByNames().containsKey("index1"), is(true)); - assertThat(result.getFailureByNames().get("index1"), nullValue()); - assertThat(result.getFailureByNames().containsKey("index2"), is(true)); - assertThat(result.getFailureByNames().get("index2"), nullValue()); - assertThat(result.getFailureByNames().containsKey("index3"), is(true)); - assertThat(result.getFailureByNames().get("index3"), nullValue()); - - verify(client, times(3)).execute(same(AutoCreateIndexAction.INSTANCE), any(), any()); - } - - public void testAutoCreateFailure() { - AutoCreateAction.Request request = - new AutoCreateAction.Request(Set.of("index1", "index2", "index3"), "reason", randomBoolean() ? null : randomBoolean()); - Client client = mock(Client.class); - Mockito.doAnswer(invocationOnMock -> { - CreateIndexRequest createIndexRequest = (CreateIndexRequest) invocationOnMock.getArguments()[1]; - assertThat(createIndexRequest.index(), in(request.getNames())); - assertThat(createIndexRequest.cause(), equalTo(request.getCause())); - assertThat(createIndexRequest.preferV2Templates(), equalTo((request.getPreferV2Templates()))); - - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; - if ("index2".equals(createIndexRequest.index())) { - listener.onFailure(new Exception("fail!")); - } else { - listener.onResponse(new CreateIndexResponse(true, true, createIndexRequest.index())); - } - - return invocationOnMock; - }).when(client).execute(same(AutoCreateIndexAction.INSTANCE), any(), any()); - - AutoCreateAction.Response[] holder = new AutoCreateAction.Response[1]; - AutoCreateAction.autoCreate(request, - ActionListener.wrap(r -> holder[0] = r, e -> {throw new RuntimeException(e);}), client); - AutoCreateAction.Response result = holder[0]; - - assertThat(result.getFailureByNames().size(), equalTo(3)); - assertThat(result.getFailureByNames().containsKey("index1"), is(true)); - assertThat(result.getFailureByNames().get("index1"), nullValue()); - assertThat(result.getFailureByNames().containsKey("index2"), is(true)); - assertThat(result.getFailureByNames().get("index2"), notNullValue()); - assertThat(result.getFailureByNames().get("index2").getMessage(), equalTo("fail!")); - assertThat(result.getFailureByNames().containsKey("index3"), is(true)); - assertThat(result.getFailureByNames().get("index3"), nullValue()); - - verify(client, times(3)).execute(same(AutoCreateIndexAction.INSTANCE), any(), any()); - } - -} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateRequestTests.java deleted file mode 100644 index e235e0a98ae3a..0000000000000 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateRequestTests.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.indices.create; - -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.test.AbstractWireSerializingTestCase; - -public class AutoCreateRequestTests extends AbstractWireSerializingTestCase { - - @Override - protected Writeable.Reader instanceReader() { - return AutoCreateAction.Request::new; - } - - @Override - protected AutoCreateAction.Request createTestInstance() { - return new AutoCreateAction.Request(randomUnique(() -> randomAlphaOfLength(4), 4), randomAlphaOfLength(4), - randomBoolean()); - } -} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateResponseTests.java deleted file mode 100644 index 1135a86044bb2..0000000000000 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/create/AutoCreateResponseTests.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.indices.create; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.test.AbstractWireSerializingTestCase; - -import java.util.NavigableMap; -import java.util.TreeMap; - -public class AutoCreateResponseTests extends AbstractWireSerializingTestCase { - - @Override - protected Writeable.Reader instanceReader() { - return AutoCreateAction.Response::new; - } - - @Override - protected AutoCreateAction.Response createTestInstance() { - int numEntries = randomIntBetween(0, 16); - NavigableMap map = new TreeMap<>(); - for (int i = 0; i < numEntries; i++) { - map.put(randomAlphaOfLength(4), new ElasticsearchException(randomAlphaOfLength(4))); - } - return new AutoCreateAction.Response(map); - } -} diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index 5d0448a9279a1..9b98bcf08cb7f 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -21,7 +21,6 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.create.AutoCreateAction; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; @@ -140,17 +139,10 @@ boolean shouldAutoCreate(String index, ClusterState state) { @Override void createIndex(String index, Boolean preferV2Templates, - TimeValue timeout, ActionListener listener) { + TimeValue timeout, Version minNodeVersion, ActionListener listener) { // If we try to create an index just immediately assume it worked listener.onResponse(new CreateIndexResponse(true, true, index) {}); } - - @Override - void autoCreate(Set names, Boolean preferV2Templates, TimeValue timeout, - ActionListener listener) { - // If we try to create an index just immediately assume it worked - listener.onResponse(new AutoCreateAction.Response(Map.of())); - } }; action.doExecute(null, bulkRequest, null); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 3212e53478040..85f541711ccb2 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.admin.indices.create.AutoCreateAction; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; @@ -69,7 +68,6 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; @@ -161,17 +159,11 @@ void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeN @Override void createIndex(String index, Boolean preferV2Templates, - TimeValue timeout, ActionListener listener) { + TimeValue timeout, Version minNodeVersion, + ActionListener listener) { indexCreated = true; listener.onResponse(null); } - - @Override - void autoCreate(Set names, Boolean preferV2Templates, TimeValue timeout, - ActionListener listener) { - indexCreated = true; - listener.onResponse(new AutoCreateAction.Response(Map.of())); - } } class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index a41138b67d5ce..d524f7b0b5f17 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -21,7 +21,6 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.create.AutoCreateAction; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.bulk.TransportBulkActionTookTests.Resolver; import org.elasticsearch.action.delete.DeleteRequest; @@ -54,8 +53,6 @@ import java.util.Collections; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; @@ -88,17 +85,10 @@ protected boolean needToCheck() { @Override void createIndex(String index, Boolean preferV2Templates, - TimeValue timeout, ActionListener listener) { + TimeValue timeout, Version minNodeVersion, ActionListener listener) { indexCreated = true; listener.onResponse(null); } - - @Override - void autoCreate(Set names, Boolean preferV2Templates, TimeValue timeout, - ActionListener listener) { - indexCreated = true; - listener.onResponse(new AutoCreateAction.Response(Map.of())); - } } @Before diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java index d35a1e7a9f8b6..234b89773b992 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction; import org.elasticsearch.action.admin.indices.close.CloseIndexAction; import org.elasticsearch.action.admin.indices.create.AutoCreateAction; -import org.elasticsearch.action.admin.indices.create.AutoCreateIndexAction; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; import org.elasticsearch.action.admin.indices.get.GetIndexAction; @@ -59,8 +58,7 @@ public final class IndexPrivilege extends Privilege { private static final Automaton MONITOR_AUTOMATON = patterns("indices:monitor/*"); private static final Automaton MANAGE_AUTOMATON = unionAndMinimize(Arrays.asList(MONITOR_AUTOMATON, patterns("indices:admin/*"))); - private static final Automaton CREATE_INDEX_AUTOMATON = patterns(CreateIndexAction.NAME, AutoCreateAction.NAME, - AutoCreateIndexAction.NAME); + private static final Automaton CREATE_INDEX_AUTOMATON = patterns(CreateIndexAction.NAME, AutoCreateAction.NAME); private static final Automaton DELETE_INDEX_AUTOMATON = patterns(DeleteIndexAction.NAME); private static final Automaton VIEW_METADATA_AUTOMATON = patterns(GetAliasesAction.NAME, GetIndexAction.NAME, GetFieldMappingsAction.NAME + "*", GetMappingsAction.NAME, diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java index 16d5b6a1acc46..afccfc4279e0b 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java @@ -292,15 +292,8 @@ private void handleIndexActionAuthorizationResult(final IndexAuthorizationResult } //if we are creating an index we need to authorize potential aliases created at the same time if (IndexPrivilege.CREATE_INDEX_MATCHER.test(action)) { - final Set aliases; - if (request instanceof CreateIndexRequest) { - aliases = ((CreateIndexRequest) request).aliases(); - } else if (request instanceof AutoCreateAction.Request) { - aliases = Set.of(); - } else { - aliases = Set.of(); - assert false; - } + assert request instanceof CreateIndexRequest; + Set aliases = ((CreateIndexRequest) request).aliases(); if (aliases.isEmpty()) { runRequestInterceptors(requestInfo, authzInfo, authorizationEngine, listener); } else { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java index 3536b42b94759..552952d752606 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java @@ -201,7 +201,6 @@ boolean checkSameUserPermissions(String action, TransportRequest request, Authen private static boolean shouldAuthorizeIndexActionNameOnly(String action, TransportRequest request) { switch (action) { - case AutoCreateAction.NAME: case BulkAction.NAME: case IndexAction.NAME: case DeleteAction.NAME: diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/WriteActionsTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/WriteActionsTests.java index 7c26e6023c9d2..7eb411fbe0276 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/WriteActionsTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/WriteActionsTests.java @@ -37,7 +37,7 @@ protected String configRoles() { " indices:\n" + " - names: 'missing'\n" + " privileges: [ 'indices:admin/create', 'indices:admin/auto_create', " + - "'indices:admin/auto_create_index', 'indices:admin/delete' ]\n" + + "'indices:admin/delete' ]\n" + " - names: ['/index.*/']\n" + " privileges: [ manage ]\n" + " - names: ['/test.*/']\n" + From 58575da2e3babda8a1292410bdf84c7ba06966b2 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Sun, 3 May 2020 22:33:02 +0200 Subject: [PATCH 09/10] removed unused imports and class --- .../indices/create/AutoCreateIndexAction.java | 88 ------------------- .../security/authz/AuthorizationService.java | 1 - .../xpack/security/authz/RBACEngine.java | 1 - 3 files changed, 90 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexAction.java diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexAction.java deleted file mode 100644 index e709124dcf063..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateIndexAction.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.indices.create; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; - -import java.io.IOException; - -/** - * Api that auto creates an index that originate from requests that write into an index that doesn't yet exist. - */ -public final class AutoCreateIndexAction extends ActionType { - - public static final AutoCreateIndexAction INSTANCE = new AutoCreateIndexAction(); - public static final String NAME = "indices:admin/auto_create_index"; - - private AutoCreateIndexAction() { - super(NAME, CreateIndexResponse::new); - } - - public static final class TransportAction extends TransportMasterNodeAction { - - private final MetadataCreateIndexService createIndexService; - - @Inject - public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - MetadataCreateIndexService createIndexService) { - super(NAME, transportService, clusterService, threadPool, actionFilters, CreateIndexRequest::new, - indexNameExpressionResolver); - this.createIndexService = createIndexService; - } - - @Override - protected String executor() { - return ThreadPool.Names.SAME; - } - - @Override - protected CreateIndexResponse read(StreamInput in) throws IOException { - return new CreateIndexResponse(in); - } - - @Override - protected void masterOperation(Task task, - CreateIndexRequest request, - ClusterState state, - ActionListener listener) throws Exception { - TransportCreateIndexAction.innerCreateIndex(request, listener, indexNameExpressionResolver, createIndexService); - } - - @Override - protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterState state) { - return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.index()); - } - } - -} diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java index afccfc4279e0b..26b07beb4838d 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java @@ -14,7 +14,6 @@ import org.elasticsearch.action.StepListener; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction; -import org.elasticsearch.action.admin.indices.create.AutoCreateAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.bulk.BulkItemRequest; import org.elasticsearch.action.bulk.BulkShardRequest; diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java index 552952d752606..81b35e4dfb163 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java @@ -15,7 +15,6 @@ import org.elasticsearch.action.CompositeIndicesRequest; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; -import org.elasticsearch.action.admin.indices.create.AutoCreateAction; import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.delete.DeleteAction; From fa36c850fbfcbcd98eb20cdb3ed903ee06fbcedc Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 4 May 2020 16:11:51 +0200 Subject: [PATCH 10/10] Added integration test for auto index creation and the auto_create index privilege --- .../test/security/authz/15_auto_create.yml | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/test/security/authz/15_auto_create.yml diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/security/authz/15_auto_create.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/security/authz/15_auto_create.yml new file mode 100644 index 0000000000000..561f30afd3fab --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/security/authz/15_auto_create.yml @@ -0,0 +1,85 @@ +--- +setup: + - skip: + features: headers + + - do: + cluster.health: + wait_for_status: yellow + + - do: + security.put_role: + name: "append_logs" + body: > + { + "indices": [ + { "names": ["logs-foobar" ], "privileges": ["create_doc", "create_index"] }, + { "names": ["logs-*" ], "privileges": ["create_doc"] } + ] + } + + - do: + security.put_user: + username: "test_user" + body: > + { + "password" : "x-pack-test-password", + "roles" : [ "append_logs" ], + "full_name" : "user with mixed privileges to multiple indices" + } + +--- +teardown: + - do: + security.delete_user: + username: "test_user" + ignore: 404 + + - do: + security.delete_role: + name: "append_logs" + ignore: 404 + +--- +"Test auto index creation": + # Only auto creation of logs-foobar index works. + - do: + headers: { Authorization: "Basic dGVzdF91c2VyOngtcGFjay10ZXN0LXBhc3N3b3Jk" } # test_user + bulk: + body: + - '{"create": {"_index": "logs-foobar"}}' + - '{}' + - '{"create": {"_index": "logs-barbaz"}}' + - '{}' + - match: { errors: true } + - match: { items.0.create.status: 201 } + - match: { items.1.create.status: 403 } + + - do: # superuser + indices.refresh: + index: "_all" + + - do: # superuser + search: + rest_total_hits_as_int: true + index: "logs-*" + - match: { hits.total: 1 } + + # Create the logs-barbaz with the superuser + - do: # superuser + indices.create: + index: logs-barbaz + body: {} + + # Ensure that just appending data via both indices work now that the indices have been auto created + - do: + headers: { Authorization: "Basic dGVzdF91c2VyOngtcGFjay10ZXN0LXBhc3N3b3Jk" } # test_user + bulk: + body: + - '{"create": {"_index": "logs-foobar"}}' + - '{}' + - '{"create": {"_index": "logs-barbaz"}}' + - '{}' + - match: { errors: false } + - match: { items.0.create.status: 201 } + - match: { items.1.create.status: 201 }