Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix security index auto-create and state recovery race #39582

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
Expand Down Expand Up @@ -40,6 +41,7 @@
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.security.index.RestrictedIndicesNames;
import org.elasticsearch.xpack.core.template.TemplateUtils;

Expand Down Expand Up @@ -81,7 +83,7 @@ public class SecurityIndexManager implements ClusterStateListener {
private volatile State indexState;

public SecurityIndexManager(Client client, String indexName, ClusterService clusterService) {
this(client, indexName, new State(false, false, false, false, null, null, null));
this(client, indexName, State.UNRECOVERED_STATE);
clusterService.addListener(this);
}

Expand Down Expand Up @@ -121,6 +123,10 @@ public boolean isMappingUpToDate() {
return this.indexState.mappingUpToDate;
}

public boolean isStateRecovered() {
return this.indexState != State.UNRECOVERED_STATE;
}

public ElasticsearchException getUnavailableReason() {
final State localState = this.indexState;
if (localState.indexAvailable) {
Expand Down Expand Up @@ -297,7 +303,9 @@ public void checkIndexVersionThenExecute(final Consumer<Exception> consumer, fin
public void prepareIndexIfNeededThenExecute(final Consumer<Exception> consumer, final Runnable andThen) {
final State indexState = this.indexState; // use a local copy so all checks execute against the same state!
// TODO we should improve this so we don't fire off a bunch of requests to do the same thing (create or update mappings)
if (indexState.indexExists && indexState.isIndexUpToDate == false) {
if (indexState == State.UNRECOVERED_STATE) {
consumer.accept(new ElasticsearchStatusException("State not yet recovered", RestStatus.SERVICE_UNAVAILABLE));
albertzaharovits marked this conversation as resolved.
Show resolved Hide resolved
} else if (indexState.indexExists && indexState.isIndexUpToDate == false) {
consumer.accept(new IllegalStateException(
"Security index is not on the current version. Security features relying on the index will not be available until " +
"the upgrade API is run on the security index"));
Expand Down Expand Up @@ -377,6 +385,7 @@ public static boolean isIndexDeleted(State previousState, State currentState) {
* State of the security index.
*/
public static class State {
public static final State UNRECOVERED_STATE = new State(false, false, false, false, null, null, null);
public final boolean indexExists;
public final boolean isIndexUpToDate;
public final boolean indexAvailable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
Expand All @@ -26,6 +27,7 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
Expand All @@ -40,10 +42,13 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.security.test.SecurityTestUtils;
Expand All @@ -55,6 +60,10 @@
import static org.elasticsearch.xpack.security.support.SecurityIndexManager.SECURITY_TEMPLATE_NAME;
import static org.elasticsearch.xpack.security.support.SecurityIndexManager.TEMPLATE_VERSION_PATTERN;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -72,6 +81,7 @@ public void setUpManager() {
final Client mockClient = mock(Client.class);
final ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
when(threadPool.generic()).thenReturn(EsExecutors.newDirectExecutorService());
when(mockClient.threadPool()).thenReturn(threadPool);
when(mockClient.settings()).thenReturn(Settings.EMPTY);
final ClusterService clusterService = mock(ClusterService.class);
Expand Down Expand Up @@ -192,6 +202,67 @@ public void testIndexHealthChangeListeners() throws Exception {
assertEquals(ClusterHealthStatus.GREEN, currentState.get().indexStatus);
}

public void testWriteBeforeStateNotRecovered() throws Exception {
final AtomicBoolean prepareRunnableCalled = new AtomicBoolean(false);
final AtomicReference<Exception> prepareException = new AtomicReference<>(null);
manager.prepareIndexIfNeededThenExecute(ex -> {
prepareException.set(ex);
}, () -> {
prepareRunnableCalled.set(true);
});
assertThat(prepareException.get(), is(notNullValue()));
assertThat(prepareException.get(), instanceOf(ElasticsearchStatusException.class));
assertThat(((ElasticsearchStatusException)prepareException.get()).status(), is(RestStatus.SERVICE_UNAVAILABLE));
assertThat(prepareRunnableCalled.get(), is(false));
prepareException.set(null);
prepareRunnableCalled.set(false);
// state not recovered
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
manager.clusterChanged(event(new ClusterState.Builder(CLUSTER_NAME).blocks(blocks)));
manager.prepareIndexIfNeededThenExecute(ex -> {
prepareException.set(ex);
}, () -> {
prepareRunnableCalled.set(true);
});
assertThat(prepareException.get(), is(notNullValue()));
assertThat(prepareException.get(), instanceOf(ElasticsearchStatusException.class));
assertThat(((ElasticsearchStatusException)prepareException.get()).status(), is(RestStatus.SERVICE_UNAVAILABLE));
assertThat(prepareRunnableCalled.get(), is(false));
prepareException.set(null);
prepareRunnableCalled.set(false);
// state recovered with index
ClusterState.Builder clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME,
SecurityIndexManager.INTERNAL_INDEX_FORMAT);
markShardsAvailable(clusterStateBuilder);
manager.clusterChanged(event(clusterStateBuilder));
manager.prepareIndexIfNeededThenExecute(ex -> {
prepareException.set(ex);
}, () -> {
prepareRunnableCalled.set(true);
});
assertThat(prepareException.get(), is(nullValue()));
assertThat(prepareRunnableCalled.get(), is(true));
}

public void testListeneredNotCalledBeforeStateNotRecovered() throws Exception {
final AtomicBoolean listenerCalled = new AtomicBoolean(false);
manager.addIndexStateListener((prev, current) -> {
listenerCalled.set(true);
});
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
// state not recovered
manager.clusterChanged(event(new ClusterState.Builder(CLUSTER_NAME).blocks(blocks)));
assertThat(manager.isStateRecovered(), is(false));
assertThat(listenerCalled.get(), is(false));
// state recovered with index
ClusterState.Builder clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME,
SecurityIndexManager.INTERNAL_INDEX_FORMAT);
markShardsAvailable(clusterStateBuilder);
manager.clusterChanged(event(clusterStateBuilder));
assertThat(manager.isStateRecovered(), is(true));
assertThat(listenerCalled.get(), is(true));
}

public void testIndexOutOfDateListeners() throws Exception {
final AtomicBoolean listenerCalled = new AtomicBoolean(false);
manager.clusterChanged(event(new ClusterState.Builder(CLUSTER_NAME)));
Expand Down Expand Up @@ -236,12 +307,14 @@ private void assertInitialState() {
assertThat(manager.indexExists(), Matchers.equalTo(false));
assertThat(manager.isAvailable(), Matchers.equalTo(false));
assertThat(manager.isMappingUpToDate(), Matchers.equalTo(false));
assertThat(manager.isStateRecovered(), Matchers.equalTo(false));
}

private void assertIndexUpToDateButNotAvailable() {
assertThat(manager.indexExists(), Matchers.equalTo(true));
assertThat(manager.isAvailable(), Matchers.equalTo(false));
assertThat(manager.isMappingUpToDate(), Matchers.equalTo(true));
assertThat(manager.isStateRecovered(), Matchers.equalTo(true));
}

public static ClusterState.Builder createClusterState(String indexName, String templateName) throws IOException {
Expand Down