Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix security index auto-create and state recovery race #39582

Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@

import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
Expand Down Expand Up @@ -105,6 +107,11 @@ public boolean indexExists() {
return this.indexState.indexExists;
}

public boolean isStateRecovered() {
// as soon as state recovers we know the name of the .security index
return this.indexState.concreteIndexName != null;
}

/**
* Returns whether the index is on the current format if it exists. If the index does not exist
* we treat the index as up to date as we expect it to be created with the current format.
Expand Down Expand Up @@ -161,14 +168,17 @@ public void clusterChanged(ClusterChangedEvent event) {
final Version mappingVersion = oldestIndexMappingVersion(event.state());
final ClusterHealthStatus indexStatus = indexMetaData == null ? null :
new ClusterIndexHealth(indexMetaData, event.state().getRoutingTable().index(indexMetaData.getIndex())).getStatus();
// index name non-null iff state recovered
final String concreteIndexName = indexMetaData == null ? INTERNAL_SECURITY_INDEX : indexMetaData.getIndex().getName();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally, I would've opted for a new state flag isStateRecovered, but given the amount of flags already existing I think concreteIndexName is a fitting substitute because the index name is also semantically (not only practically) linked to the state recovery status - it's easy to reason that we can't name the security index until the state containing all the index names has been recovered.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer a new flag.
I think it's too easy to miss that the concreteIndexName variable has additional, non-obvious semantics, and change the default constructor to assign INTERNAL_SECURITY_INDEX to that field.
It's really random that I didn't do that when I introduced the index name field.

As the code stands, you hanging this protection off the fact that we

  1. happen to set that variable to null at construction,
  2. don't (currently) assign a value to it until we get past the STATE_NOT_RECOVERED_BLOCK
  3. always set it to a non-null value once we've recovered.

I don't think you can be sure that we will never change this implementation to behave differently and break one of those pre-conditions.
If we're worried about the number of fields here, we can switch all the booleans to a BitSet, but I don't think it's actually necessary to re-use fields for a purpose other than what they were intended.

final State newState = new State(indexExists, isIndexUpToDate, indexAvailable, mappingIsUpToDate, mappingVersion, concreteIndexName,
indexStatus);
this.indexState = newState;

if (newState.equals(previousState) == false) {
for (BiConsumer<State, State> listener : stateChangeListeners) {
listener.accept(previousState, newState);
// point in time iterator
final Iterator<BiConsumer<State, State>> stateListenerIterator = stateChangeListeners.iterator();
while (stateListenerIterator.hasNext()) {
stateListenerIterator.next().accept(previousState, newState);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell this change doesn't actually do anything.
A for loop is just syntactic sugar over iterator() and next(), you haven't change the semantics. The stateListenerIterator is still backed by the same List and is at risk of ConcurrentModificationException issues if a listener is added/removed.

Assuming that's the problem you're trying to solve, then the options come down to:

  1. synchronize whenever we use the list
  2. switch the list implementation to CopyOnWriteArrayList
  3. make a copy of the list before iterating over it

(2) is probably the best option. I don't think we add listeners very often, so making a list copy each time is probably OK, but you'd need to audit the places we modify the list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell this change doesn't actually do anything.

It is a rewrite that unwraps the syntactic sugar because I find it confusing to use that when the list is concurrently modified.

The list is already of the CopyOnWriteArrayList type.

}
}
}
Expand Down Expand Up @@ -281,7 +291,9 @@ private static Version readMappingVersion(String indexName, MappingMetaData mapp
*/
public void checkIndexVersionThenExecute(final Consumer<Exception> consumer, final Runnable andThen) {
final State indexState = this.indexState; // use a local copy so all checks execute against the same state!
if (indexState.indexExists && indexState.isIndexUpToDate == false) {
if (false == isStateRecovered()) {
delayUntilStateRecovered(consumer, () -> checkIndexVersionThenExecute(consumer, andThen));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This worries me.
We're suddenly adding in a queue of pending tasks to run when the recovery block is removed, but I don't see any analysis on how big that queue could get, and what the impact is on the thread pool when it happens.

We don't do that now. We don't queue if the security index is red, we currently fail if the index is not recovered.
There's not enough information in this PR to know whether or not it's a sensible thing to do, and how we'd protect ourselves from getting into trouble with it (e.g. too many queued tasks, or delayed read + writes because recovery was slow).

From a usability point of view it's a nice idea, but it feels like we're jumping in without thinking it through.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, understood! I just wonder if there is some alternative for "delaying requests" that would be acceptable beyond feature freeze?

} else if (indexState.indexExists && indexState.isIndexUpToDate == false) {
consumer.accept(new IllegalStateException(
"Security index is not on the current version. Security features relying on the index will not be available until " +
"the upgrade API is run on the security index"));
Expand All @@ -297,14 +309,17 @@ public void checkIndexVersionThenExecute(final Consumer<Exception> consumer, fin
public void prepareIndexIfNeededThenExecute(final Consumer<Exception> consumer, final Runnable andThen) {
final State indexState = this.indexState; // use a local copy so all checks execute against the same state!
// TODO we should improve this so we don't fire off a bunch of requests to do the same thing (create or update mappings)
if (indexState.indexExists && indexState.isIndexUpToDate == false) {
if (false == isStateRecovered()) {
delayUntilStateRecovered(consumer, () -> prepareIndexIfNeededThenExecute(consumer, andThen));
} else if (indexState.indexExists && indexState.isIndexUpToDate == false) {
consumer.accept(new IllegalStateException(
"Security index is not on the current version. Security features relying on the index will not be available until " +
"the upgrade API is run on the security index"));
} else if (indexState.indexExists == false) {
LOGGER.info("security index does not exist. Creating [{}] with alias [{}]", INTERNAL_SECURITY_INDEX, SECURITY_INDEX_NAME);
assert INTERNAL_SECURITY_INDEX.equals(indexState.concreteIndexName);
LOGGER.info("security index does not exist. Creating [{}] with alias [{}]", indexState.concreteIndexName, SECURITY_INDEX_NAME);
Tuple<String, Settings> mappingAndSettings = loadMappingAndSettingsSourceFromTemplate();
CreateIndexRequest request = new CreateIndexRequest(INTERNAL_SECURITY_INDEX)
CreateIndexRequest request = new CreateIndexRequest(indexState.concreteIndexName)
.alias(new Alias(SECURITY_INDEX_NAME))
.mapping("doc", mappingAndSettings.v1(), XContentType.JSON)
.waitForActiveShards(ActiveShardCount.ALL)
Expand Down Expand Up @@ -373,6 +388,41 @@ public static boolean isIndexDeleted(State previousState, State currentState) {
return previousState.indexStatus != null && currentState.indexStatus == null;
}

/**
* Delay the {@code runnable} invocation until cluster state recovered.
*/
private void delayUntilStateRecovered(final Consumer<Exception> consumer, final Runnable runnable) {
final AtomicBoolean done = new AtomicBoolean(false);
// context preserving one-shot runnable
final Runnable delayedRunnable = client.threadPool().getThreadContext().preserveContext(() -> {
if (done.compareAndSet(false, true)) {
runnable.run();
}
});
final BiConsumer<State, State> gatewayRecoveryListener = new BiConsumer<State, State>() {
@Override
public void accept(State prevState, State newState) {
assert isStateRecovered() : "State listener is notified for updates only after state recovered.";
assert newState.concreteIndexName != null : "The newly applied state following a recovery should name the .security index";
if (newState.concreteIndexName != null) {
stateChangeListeners.remove(this);
client.threadPool().generic().execute(delayedRunnable);
} else {
// any cluster state update is an indication that the state recovered
consumer.accept(new IllegalStateException("State has been recovered, but the security index name is unknown."));
}
}
};
// enqueue and wait for the first cluster state update
stateChangeListeners.add(gatewayRecoveryListener);
// maybe state recovered in the meantime since we last checked
if (isStateRecovered()) {
// state indeed recovered and we _might_ have lost the notification
stateChangeListeners.remove(gatewayRecoveryListener);
delayedRunnable.run();
}
}

/**
* State of the security index.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
Expand All @@ -40,8 +41,10 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
Expand All @@ -55,6 +58,7 @@
import static org.elasticsearch.xpack.security.support.SecurityIndexManager.SECURITY_TEMPLATE_NAME;
import static org.elasticsearch.xpack.security.support.SecurityIndexManager.TEMPLATE_VERSION_PATTERN;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -72,6 +76,7 @@ public void setUpManager() {
final Client mockClient = mock(Client.class);
final ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
when(threadPool.generic()).thenReturn(EsExecutors.newDirectExecutorService());
when(mockClient.threadPool()).thenReturn(threadPool);
when(mockClient.settings()).thenReturn(Settings.EMPTY);
final ClusterService clusterService = mock(ClusterService.class);
Expand Down Expand Up @@ -192,6 +197,33 @@ public void testIndexHealthChangeListeners() throws Exception {
assertEquals(ClusterHealthStatus.GREEN, currentState.get().indexStatus);
}

public void testListeneredNotCalledBeforeStateNotRecovered() throws Exception {
final AtomicBoolean listenerCalled = new AtomicBoolean(false);
manager.addIndexStateListener((prev, current) -> {
listenerCalled.set(true);
});
final AtomicBoolean prepareCalled = new AtomicBoolean(false);
manager.prepareIndexIfNeededThenExecute(c -> {}, () -> {
prepareCalled.set(true);
});
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
// state not recovered
manager.clusterChanged(event(new ClusterState.Builder(CLUSTER_NAME).blocks(blocks)));
assertThat(listenerCalled.get(), is(false));
assertThat(prepareCalled.get(), is(false));
// state still not recovered
manager.clusterChanged(event(new ClusterState.Builder(CLUSTER_NAME).blocks(blocks)));
assertThat(listenerCalled.get(), is(false));
assertThat(prepareCalled.get(), is(false));
// state recovered with index
ClusterState.Builder clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME,
SecurityIndexManager.INTERNAL_INDEX_FORMAT);
markShardsAvailable(clusterStateBuilder);
manager.clusterChanged(event(clusterStateBuilder));
assertThat(listenerCalled.get(), is(true));
assertThat(prepareCalled.get(), is(true));
}

public void testIndexOutOfDateListeners() throws Exception {
final AtomicBoolean listenerCalled = new AtomicBoolean(false);
manager.clusterChanged(event(new ClusterState.Builder(CLUSTER_NAME)));
Expand Down Expand Up @@ -236,12 +268,14 @@ private void assertInitialState() {
assertThat(manager.indexExists(), Matchers.equalTo(false));
assertThat(manager.isAvailable(), Matchers.equalTo(false));
assertThat(manager.isMappingUpToDate(), Matchers.equalTo(false));
assertThat(manager.isStateRecovered(), Matchers.equalTo(false));
}

private void assertIndexUpToDateButNotAvailable() {
assertThat(manager.indexExists(), Matchers.equalTo(true));
assertThat(manager.isAvailable(), Matchers.equalTo(false));
assertThat(manager.isMappingUpToDate(), Matchers.equalTo(true));
assertThat(manager.isStateRecovered(), Matchers.equalTo(true));
}

public static ClusterState.Builder createClusterState(String indexName, String templateName) throws IOException {
Expand Down