From 65366e0d1faab4e9d88cfb94e477848950552ab6 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 9 Jan 2020 14:49:56 +0100 Subject: [PATCH] Write CS asynchronously on data-only nodes (#50782) Writes cluster states out asynchronously on data-only nodes. The main reason for writing out the cluster state at all is so that the data-only nodes can snap into a cluster, that they can do a bit of bootstrap validation and so that the shard recovery tools work. Cluster states that are written asynchronously have their voting configuration adapted to a non existing configuration so that these nodes cannot mistakenly become master even if their node role is changed back and forth. Relates #48701 --- .../gateway/GatewayMetaState.java | 166 +++++++++++++++++- .../UnsafeBootstrapAndDetachCommandIT.java | 3 + .../GatewayMetaStatePersistedStateTests.java | 100 +++++++++++ .../RemoveCorruptedShardDataCommandIT.java | 4 + 4 files changed, 269 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index fa1ddbea7a8ff..c5989f60d1e5c 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -20,11 +20,14 @@ package org.elasticsearch.gateway; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.coordination.CoordinationMetaData; import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState; import org.elasticsearch.cluster.coordination.InMemoryPersistedState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -37,21 +40,32 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeMetaData; +import org.elasticsearch.node.Node; import org.elasticsearch.plugins.MetaDataUpgrader; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.UnaryOperator; +import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; + /** * Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts. * @@ -100,7 +114,7 @@ public void start(Settings settings, TransportService transportService, ClusterS } final PersistedClusterStateService.Writer persistenceWriter = persistedClusterStateService.createWriter(); - final LucenePersistedState lucenePersistedState; + final PersistedState persistedState; boolean success = false; try { final ClusterState clusterState = prepareInitialClusterState(transportService, clusterService, @@ -108,8 +122,12 @@ public void start(Settings settings, TransportService transportService, ClusterS .version(lastAcceptedVersion) .metaData(upgradeMetaDataForNode(metaData, metaDataIndexUpgradeService, metaDataUpgrader)) .build()); - lucenePersistedState = new LucenePersistedState( - persistenceWriter, currentTerm, clusterState); + if (DiscoveryNode.isMasterNode(settings)) { + persistedState = new LucenePersistedState(persistenceWriter, currentTerm, clusterState); + } else { + persistedState = new AsyncLucenePersistedState(settings, transportService.getThreadPool(), + new LucenePersistedState(persistenceWriter, currentTerm, clusterState)); + } if (DiscoveryNode.isDataNode(settings)) { metaStateService.unreferenceAll(); // unreference legacy files (only keep them for dangling indices functionality) } else { @@ -125,7 +143,7 @@ public void start(Settings settings, TransportService transportService, ClusterS } } - persistedState.set(lucenePersistedState); + this.persistedState.set(persistedState); } catch (IOException e) { throw new ElasticsearchException("failed to load metadata", e); } @@ -227,6 +245,146 @@ public void close() throws IOException { IOUtils.close(persistedState.get()); } + // visible for testing + public boolean allPendingAsyncStatesWritten() { + final PersistedState ps = persistedState.get(); + if (ps instanceof AsyncLucenePersistedState) { + return ((AsyncLucenePersistedState) ps).allPendingAsyncStatesWritten(); + } else { + return true; + } + } + + static class AsyncLucenePersistedState extends InMemoryPersistedState { + + private static final Logger logger = LogManager.getLogger(AsyncLucenePersistedState.class); + + static final String THREAD_NAME = "AsyncLucenePersistedState#updateTask"; + + private final EsThreadPoolExecutor threadPoolExecutor; + private final PersistedState persistedState; + + boolean newCurrentTermQueued = false; + boolean newStateQueued = false; + + private final Object mutex = new Object(); + + AsyncLucenePersistedState(Settings settings, ThreadPool threadPool, PersistedState persistedState) { + super(persistedState.getCurrentTerm(), persistedState.getLastAcceptedState()); + final String nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings)); + threadPoolExecutor = EsExecutors.newFixed( + nodeName + "/" + THREAD_NAME, + 1, 1, + daemonThreadFactory(nodeName, THREAD_NAME), + threadPool.getThreadContext()); + this.persistedState = persistedState; + } + + @Override + public void setCurrentTerm(long currentTerm) { + synchronized (mutex) { + super.setCurrentTerm(currentTerm); + if (newCurrentTermQueued) { + logger.trace("term update already queued (setting term to {})", currentTerm); + } else { + logger.trace("queuing term update (setting term to {})", currentTerm); + newCurrentTermQueued = true; + scheduleUpdate(); + } + } + } + + @Override + public void setLastAcceptedState(ClusterState clusterState) { + synchronized (mutex) { + super.setLastAcceptedState(clusterState); + if (newStateQueued) { + logger.trace("cluster state update already queued (setting cluster state to {})", clusterState.version()); + } else { + logger.trace("queuing cluster state update (setting cluster state to {})", clusterState.version()); + newStateQueued = true; + scheduleUpdate(); + } + } + } + + private void scheduleUpdate() { + assert Thread.holdsLock(mutex); + try { + threadPoolExecutor.execute(new AbstractRunnable() { + + @Override + public void onFailure(Exception e) { + logger.error("Exception occurred when storing new meta data", e); + } + + @Override + protected void doRun() { + final Long term; + final ClusterState clusterState; + synchronized (mutex) { + if (newCurrentTermQueued) { + term = getCurrentTerm(); + newCurrentTermQueued = false; + } else { + term = null; + } + if (newStateQueued) { + clusterState = getLastAcceptedState(); + newStateQueued = false; + } else { + clusterState = null; + } + } + // write current term before last accepted state so that it is never below term in last accepted state + if (term != null) { + persistedState.setCurrentTerm(term); + } + if (clusterState != null) { + persistedState.setLastAcceptedState(resetVotingConfiguration(clusterState)); + } + } + }); + } catch (EsRejectedExecutionException e) { + // ignore cases where we are shutting down..., there is really nothing interesting to be done here... + if (threadPoolExecutor.isShutdown() == false) { + assert false : "only expect rejections when shutting down"; + throw e; + } + } + } + + static final CoordinationMetaData.VotingConfiguration staleStateConfiguration = + new CoordinationMetaData.VotingConfiguration(Collections.singleton("STALE_STATE_CONFIG")); + + static ClusterState resetVotingConfiguration(ClusterState clusterState) { + CoordinationMetaData newCoordinationMetaData = CoordinationMetaData.builder(clusterState.coordinationMetaData()) + .lastAcceptedConfiguration(staleStateConfiguration) + .lastCommittedConfiguration(staleStateConfiguration) + .build(); + return ClusterState.builder(clusterState).metaData(MetaData.builder(clusterState.metaData()) + .coordinationMetaData(newCoordinationMetaData).build()).build(); + } + + @Override + public void close() throws IOException { + try { + ThreadPool.terminate(threadPoolExecutor, 10, TimeUnit.SECONDS); + } finally { + persistedState.close(); + } + } + + boolean allPendingAsyncStatesWritten() { + synchronized (mutex) { + if (newCurrentTermQueued || newStateQueued) { + return false; + } + return threadPoolExecutor.getActiveCount() == 0; + } + } + } + /** * Encapsulates the incremental writing of metadata to a {@link PersistedClusterStateService.Writer}. */ diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java b/server/src/test/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java index f6dbb9f1510f6..807da1e77cb99 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.gateway.GatewayMetaState; import org.elasticsearch.gateway.PersistedClusterStateService; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.Node; @@ -259,6 +260,7 @@ public void test3MasterNodes2Failed() throws Exception { logger.info("--> stop 1st master-eligible node and data-only node"); NodeEnvironment nodeEnvironment = internalCluster().getMasterNodeInstance(NodeEnvironment.class); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(masterNodes.get(0))); + assertBusy(() -> internalCluster().getInstance(GatewayMetaState.class, dataNode).allPendingAsyncStatesWritten()); internalCluster().stopRandomDataNode(); logger.info("--> unsafely-bootstrap 1st master-eligible node"); @@ -327,6 +329,7 @@ public void testAllMasterEligibleNodesFailedDanglingIndexImport() throws Excepti logger.info("--> stop data-only node and detach it from the old cluster"); Settings dataNodeDataPathSettings = internalCluster().dataPathSettings(dataNode); + assertBusy(() -> internalCluster().getInstance(GatewayMetaState.class, dataNode).allPendingAsyncStatesWritten()); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataNode)); final Environment environment = TestEnvironment.newEnvironment( Settings.builder().put(internalCluster().getDefaultSettings()).put(dataNodeDataPathSettings).build()); diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java index 868836bb30f61..af7443055e06f 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -31,6 +31,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.set.Sets; @@ -38,15 +40,22 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.node.Node; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.nio.file.Path; import java.util.Collections; +import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class GatewayMetaStatePersistedStateTests extends ESTestCase { private NodeEnvironment nodeEnvironment; @@ -309,4 +318,95 @@ public void testStatePersistedOnLoad() throws IOException { } } + public void testDataOnlyNodePersistence() throws Exception { + DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Collections.emptyMap(), + Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE), Version.CURRENT); + Settings settings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName.value()).put( + Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_NAME_SETTING.getKey(), "test").build(); + final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode); + final TransportService transportService = mock(TransportService.class); + TestThreadPool threadPool = new TestThreadPool("testMarkAcceptedConfigAsCommittedOnDataOnlyNode"); + when(transportService.getThreadPool()).thenReturn(threadPool); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn( + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + final PersistedClusterStateService persistedClusterStateService = + new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE); + gateway.start(settings, transportService, clusterService, + new MetaStateService(nodeEnvironment, xContentRegistry()), null, null, persistedClusterStateService); + final CoordinationState.PersistedState persistedState = gateway.getPersistedState(); + assertThat(persistedState, instanceOf(GatewayMetaState.AsyncLucenePersistedState.class)); + + //generate random coordinationMetaData with different lastAcceptedConfiguration and lastCommittedConfiguration + CoordinationMetaData coordinationMetaData; + do { + coordinationMetaData = createCoordinationMetaData(randomNonNegativeLong()); + } while (coordinationMetaData.getLastAcceptedConfiguration().equals(coordinationMetaData.getLastCommittedConfiguration())); + + ClusterState state = createClusterState(randomNonNegativeLong(), + MetaData.builder().coordinationMetaData(coordinationMetaData) + .clusterUUID(randomAlphaOfLength(10)).build()); + persistedState.setLastAcceptedState(state); + assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten())); + + assertThat(persistedState.getLastAcceptedState().getLastAcceptedConfiguration(), + not(equalTo(persistedState.getLastAcceptedState().getLastCommittedConfiguration()))); + CoordinationMetaData persistedCoordinationMetaData = + persistedClusterStateService.loadBestOnDiskState().metaData.coordinationMetaData(); + assertThat(persistedCoordinationMetaData.getLastAcceptedConfiguration(), + equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration)); + assertThat(persistedCoordinationMetaData.getLastCommittedConfiguration(), + equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration)); + + persistedState.markLastAcceptedStateAsCommitted(); + assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten())); + + CoordinationMetaData expectedCoordinationMetaData = CoordinationMetaData.builder(coordinationMetaData) + .lastCommittedConfiguration(coordinationMetaData.getLastAcceptedConfiguration()).build(); + ClusterState expectedClusterState = + ClusterState.builder(state).metaData(MetaData.builder().coordinationMetaData(expectedCoordinationMetaData) + .clusterUUID(state.metaData().clusterUUID()).clusterUUIDCommitted(true).build()).build(); + + assertClusterStateEqual(expectedClusterState, persistedState.getLastAcceptedState()); + persistedCoordinationMetaData = persistedClusterStateService.loadBestOnDiskState().metaData.coordinationMetaData(); + assertThat(persistedCoordinationMetaData.getLastAcceptedConfiguration(), + equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration)); + assertThat(persistedCoordinationMetaData.getLastCommittedConfiguration(), + equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration)); + assertTrue(persistedClusterStateService.loadBestOnDiskState().metaData.clusterUUIDCommitted()); + + // generate a series of updates and check if batching works + final String indexName = randomAlphaOfLength(10); + long currentTerm = state.term(); + for (int i = 0; i < 1000; i++) { + if (rarely()) { + // bump term + currentTerm = currentTerm + (rarely() ? randomIntBetween(1, 5) : 0L); + persistedState.setCurrentTerm(currentTerm); + } else { + // update cluster state + final int numberOfShards = randomIntBetween(1, 5); + final long term = Math.min(state.term() + (rarely() ? randomIntBetween(1, 5) : 0L), currentTerm); + final IndexMetaData indexMetaData = createIndexMetaData(indexName, numberOfShards, i); + state = createClusterState(state.version() + 1, + MetaData.builder().coordinationMetaData(createCoordinationMetaData(term)).put(indexMetaData, false).build()); + persistedState.setLastAcceptedState(state); + } + } + assertEquals(currentTerm, persistedState.getCurrentTerm()); + assertClusterStateEqual(state, persistedState.getLastAcceptedState()); + assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten())); + + gateway.close(); + + try (CoordinationState.PersistedState reloadedPersistedState = newGatewayPersistedState()) { + assertEquals(currentTerm, reloadedPersistedState.getCurrentTerm()); + assertClusterStateEqual(GatewayMetaState.AsyncLucenePersistedState.resetVotingConfiguration(state), + reloadedPersistedState.getLastAcceptedState()); + assertNotNull(reloadedPersistedState.getLastAcceptedState().metaData().index(indexName)); + } + + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + } diff --git a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java index c4c620a74a826..fa1c070b28277 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java @@ -56,6 +56,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.gateway.GatewayMetaState; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MergePolicyConfig; @@ -478,6 +479,9 @@ public void testCorruptTranslogTruncationOfReplica() throws Exception { final Settings node1PathSettings = internalCluster().dataPathSettings(node1); final Settings node2PathSettings = internalCluster().dataPathSettings(node2); + assertBusy(() -> internalCluster().getInstances(GatewayMetaState.class) + .forEach(gw -> assertTrue(gw.allPendingAsyncStatesWritten()))); + // stop data nodes internalCluster().stopRandomDataNode(); internalCluster().stopRandomDataNode();