Skip to content

Commit

Permalink
Fix security index auto-create and state recovery race (#39582)
Browse files Browse the repository at this point in the history
Previously, the security index could be wrongfully recreated. This might
happen if the index was interpreted as missing, as in the case of a fresh
install, but the index existed and the state did not yet recover.

This fix will return HTTP SERVICE_UNAVAILABLE (503) for requests that
try to write to the security index before the state has not been recovered yet.
  • Loading branch information
albertzaharovits authored Mar 5, 2019
1 parent c71807a commit fd22c80
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
Expand Down Expand Up @@ -40,6 +41,7 @@
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.security.index.RestrictedIndicesNames;
import org.elasticsearch.xpack.core.template.TemplateUtils;

Expand Down Expand Up @@ -81,7 +83,7 @@ public class SecurityIndexManager implements ClusterStateListener {
private volatile State indexState;

public SecurityIndexManager(Client client, String indexName, ClusterService clusterService) {
this(client, indexName, new State(false, false, false, false, null, null, null));
this(client, indexName, State.UNRECOVERED_STATE);
clusterService.addListener(this);
}

Expand Down Expand Up @@ -121,6 +123,10 @@ public boolean isMappingUpToDate() {
return this.indexState.mappingUpToDate;
}

public boolean isStateRecovered() {
return this.indexState != State.UNRECOVERED_STATE;
}

public ElasticsearchException getUnavailableReason() {
final State localState = this.indexState;
if (localState.indexAvailable) {
Expand Down Expand Up @@ -297,7 +303,10 @@ public void checkIndexVersionThenExecute(final Consumer<Exception> consumer, fin
public void prepareIndexIfNeededThenExecute(final Consumer<Exception> consumer, final Runnable andThen) {
final State indexState = this.indexState; // use a local copy so all checks execute against the same state!
// TODO we should improve this so we don't fire off a bunch of requests to do the same thing (create or update mappings)
if (indexState.indexExists && indexState.isIndexUpToDate == false) {
if (indexState == State.UNRECOVERED_STATE) {
consumer.accept(new ElasticsearchStatusException("Cluster state has not been recovered yet, cannot write to the security index",
RestStatus.SERVICE_UNAVAILABLE));
} else if (indexState.indexExists && indexState.isIndexUpToDate == false) {
consumer.accept(new IllegalStateException(
"Security index is not on the current version. Security features relying on the index will not be available until " +
"the upgrade API is run on the security index"));
Expand Down Expand Up @@ -377,6 +386,7 @@ public static boolean isIndexDeleted(State previousState, State currentState) {
* State of the security index.
*/
public static class State {
public static final State UNRECOVERED_STATE = new State(false, false, false, false, null, null, null);
public final boolean indexExists;
public final boolean isIndexUpToDate;
public final boolean indexAvailable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
Expand All @@ -26,6 +27,7 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
Expand All @@ -40,10 +42,13 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.security.test.SecurityTestUtils;
Expand All @@ -55,6 +60,10 @@
import static org.elasticsearch.xpack.security.support.SecurityIndexManager.SECURITY_TEMPLATE_NAME;
import static org.elasticsearch.xpack.security.support.SecurityIndexManager.TEMPLATE_VERSION_PATTERN;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -72,6 +81,7 @@ public void setUpManager() {
final Client mockClient = mock(Client.class);
final ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
when(threadPool.generic()).thenReturn(EsExecutors.newDirectExecutorService());
when(mockClient.threadPool()).thenReturn(threadPool);
when(mockClient.settings()).thenReturn(Settings.EMPTY);
final ClusterService clusterService = mock(ClusterService.class);
Expand Down Expand Up @@ -192,6 +202,67 @@ public void testIndexHealthChangeListeners() throws Exception {
assertEquals(ClusterHealthStatus.GREEN, currentState.get().indexStatus);
}

public void testWriteBeforeStateNotRecovered() throws Exception {
final AtomicBoolean prepareRunnableCalled = new AtomicBoolean(false);
final AtomicReference<Exception> prepareException = new AtomicReference<>(null);
manager.prepareIndexIfNeededThenExecute(ex -> {
prepareException.set(ex);
}, () -> {
prepareRunnableCalled.set(true);
});
assertThat(prepareException.get(), is(notNullValue()));
assertThat(prepareException.get(), instanceOf(ElasticsearchStatusException.class));
assertThat(((ElasticsearchStatusException)prepareException.get()).status(), is(RestStatus.SERVICE_UNAVAILABLE));
assertThat(prepareRunnableCalled.get(), is(false));
prepareException.set(null);
prepareRunnableCalled.set(false);
// state not recovered
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
manager.clusterChanged(event(new ClusterState.Builder(CLUSTER_NAME).blocks(blocks)));
manager.prepareIndexIfNeededThenExecute(ex -> {
prepareException.set(ex);
}, () -> {
prepareRunnableCalled.set(true);
});
assertThat(prepareException.get(), is(notNullValue()));
assertThat(prepareException.get(), instanceOf(ElasticsearchStatusException.class));
assertThat(((ElasticsearchStatusException)prepareException.get()).status(), is(RestStatus.SERVICE_UNAVAILABLE));
assertThat(prepareRunnableCalled.get(), is(false));
prepareException.set(null);
prepareRunnableCalled.set(false);
// state recovered with index
ClusterState.Builder clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME,
SecurityIndexManager.INTERNAL_INDEX_FORMAT);
markShardsAvailable(clusterStateBuilder);
manager.clusterChanged(event(clusterStateBuilder));
manager.prepareIndexIfNeededThenExecute(ex -> {
prepareException.set(ex);
}, () -> {
prepareRunnableCalled.set(true);
});
assertThat(prepareException.get(), is(nullValue()));
assertThat(prepareRunnableCalled.get(), is(true));
}

public void testListeneredNotCalledBeforeStateNotRecovered() throws Exception {
final AtomicBoolean listenerCalled = new AtomicBoolean(false);
manager.addIndexStateListener((prev, current) -> {
listenerCalled.set(true);
});
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
// state not recovered
manager.clusterChanged(event(new ClusterState.Builder(CLUSTER_NAME).blocks(blocks)));
assertThat(manager.isStateRecovered(), is(false));
assertThat(listenerCalled.get(), is(false));
// state recovered with index
ClusterState.Builder clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME,
SecurityIndexManager.INTERNAL_INDEX_FORMAT);
markShardsAvailable(clusterStateBuilder);
manager.clusterChanged(event(clusterStateBuilder));
assertThat(manager.isStateRecovered(), is(true));
assertThat(listenerCalled.get(), is(true));
}

public void testIndexOutOfDateListeners() throws Exception {
final AtomicBoolean listenerCalled = new AtomicBoolean(false);
manager.clusterChanged(event(new ClusterState.Builder(CLUSTER_NAME)));
Expand Down Expand Up @@ -236,12 +307,14 @@ private void assertInitialState() {
assertThat(manager.indexExists(), Matchers.equalTo(false));
assertThat(manager.isAvailable(), Matchers.equalTo(false));
assertThat(manager.isMappingUpToDate(), Matchers.equalTo(false));
assertThat(manager.isStateRecovered(), Matchers.equalTo(false));
}

private void assertIndexUpToDateButNotAvailable() {
assertThat(manager.indexExists(), Matchers.equalTo(true));
assertThat(manager.isAvailable(), Matchers.equalTo(false));
assertThat(manager.isMappingUpToDate(), Matchers.equalTo(true));
assertThat(manager.isStateRecovered(), Matchers.equalTo(true));
}

public static ClusterState.Builder createClusterState(String indexName, String templateName) throws IOException {
Expand Down

0 comments on commit fd22c80

Please sign in to comment.