Skip to content

Commit

Permalink
Remove cluster state initial customs (#32501)
Browse files Browse the repository at this point in the history
This infrastructure was introduced in #26144 and made obsolete in #30743
  • Loading branch information
ywelsch authored Aug 2, 2018
1 parent 097c428 commit db6e8c7
Show file tree
Hide file tree
Showing 20 changed files with 75 additions and 238 deletions.
17 changes: 0 additions & 17 deletions server/src/main/java/org/elasticsearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,23 +116,6 @@ public ClusterModule(Settings settings, ClusterService clusterService, List<Clus
this.allocationService = new AllocationService(settings, allocationDeciders, shardsAllocator, clusterInfoService);
}

public static Map<String, Supplier<ClusterState.Custom>> getClusterStateCustomSuppliers(List<ClusterPlugin> clusterPlugins) {
final Map<String, Supplier<ClusterState.Custom>> customSupplier = new HashMap<>();
customSupplier.put(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress::new);
customSupplier.put(RestoreInProgress.TYPE, RestoreInProgress::new);
customSupplier.put(SnapshotsInProgress.TYPE, SnapshotsInProgress::new);
for (ClusterPlugin plugin : clusterPlugins) {
Map<String, Supplier<ClusterState.Custom>> initialCustomSupplier = plugin.getInitialClusterStateCustomSupplier();
for (String key : initialCustomSupplier.keySet()) {
if (customSupplier.containsKey(key)) {
throw new IllegalStateException("custom supplier key [" + key + "] is registered more than once");
}
}
customSupplier.putAll(initialCustomSupplier);
}
return Collections.unmodifiableMap(customSupplier);
}

public static List<Entry> getNamedWriteables() {
List<Entry> entries = new ArrayList<>();
// Cluster State
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ public interface ClusterApplier {
*/
void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplyListener listener);

/**
* Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs.
*/
ClusterState.Builder newClusterStateBuilder();

/**
* Listener for results of cluster state application
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,14 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
private final AtomicReference<ClusterState> state; // last applied state

private NodeConnectionsService nodeConnectionsService;
private Supplier<ClusterState.Builder> stateBuilderSupplier;

public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, Supplier<ClusterState
.Builder> stateBuilderSupplier) {
public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings);
this.clusterSettings = clusterSettings;
this.threadPool = threadPool;
this.state = new AtomicReference<>();
this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
this.localNodeMasterListeners = new LocalNodeMasterListeners(threadPool);
this.stateBuilderSupplier = stateBuilderSupplier;
}

public void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
Expand Down Expand Up @@ -652,8 +649,4 @@ protected long currentTimeInNanos() {
return System.nanoTime();
}

@Override
public ClusterState.Builder newClusterStateBuilder() {
return stateBuilderSupplier.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@

import java.util.Collections;
import java.util.Map;
import java.util.function.Supplier;

public class ClusterService extends AbstractLifecycleComponent {

Expand All @@ -59,30 +58,16 @@ public class ClusterService extends AbstractLifecycleComponent {
private final OperationRouting operationRouting;

private final ClusterSettings clusterSettings;
private final Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms;

public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool,
Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms) {
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings);
this.masterService = new MasterService(settings, threadPool);
this.operationRouting = new OperationRouting(settings, clusterSettings);
this.clusterSettings = clusterSettings;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
this::setSlowTaskLoggingThreshold);
this.initialClusterStateCustoms = initialClusterStateCustoms;
this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool, this::newClusterStateBuilder);
}

/**
* Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs.
*/
public ClusterState.Builder newClusterStateBuilder() {
ClusterState.Builder builder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings));
for (Map.Entry<String, Supplier<ClusterState.Custom>> entry : initialClusterStateCustoms.entrySet()) {
builder.putCustom(entry.getKey(), entry.getValue().get());
}
return builder;
this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool);
}

private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -113,7 +114,7 @@ protected synchronized void doStart() {
}

protected ClusterState createInitialState(DiscoveryNode localNode) {
ClusterState.Builder builder = clusterApplier.newClusterStateBuilder();
ClusterState.Builder builder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings));
return builder.nodes(DiscoveryNodes.builder().add(localNode)
.localNodeId(localNode.getId())
.masterNodeId(localNode.getId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ protected void doStart() {
// set initial state
assert committedState.get() == null;
assert localNode != null;
ClusterState.Builder builder = clusterApplier.newClusterStateBuilder();
ClusterState.Builder builder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings));
ClusterState initialState = builder
.blocks(ClusterBlocks.builder()
.addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/elasticsearch/gateway/Gateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
Expand Down Expand Up @@ -147,7 +148,7 @@ public void performStateRecovery(final GatewayStateRecoveredListener listener) t
metaDataBuilder.transientSettings(),
e -> logUnknownSetting("transient", e),
(e, ex) -> logInvalidSetting("transient", e, ex)));
ClusterState.Builder builder = clusterService.newClusterStateBuilder();
ClusterState.Builder builder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings));
builder.metaData(metaDataBuilder);
listener.onSuccess(builder.build());
}
Expand Down
3 changes: 1 addition & 2 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,7 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));

List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool,
ClusterModule.getClusterStateCustomSuppliers(clusterPlugins));
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
clusterService.addStateApplier(scriptModule.getScriptService());
resourcesToClose.add(clusterService);
final IngestService ingestService = new IngestService(settings, threadPool, this.environment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Map;
import java.util.function.Supplier;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -66,12 +65,4 @@ default Map<String, Supplier<ShardsAllocator>> getShardsAllocators(Settings sett
default void onNodeStarted() {
}

/**
* Returns a map of {@link ClusterState.Custom} supplier that should be invoked to initialize the initial clusterstate.
* This allows custom clusterstate extensions to be always present and prevents invariants where clusterstates are published
* but customs are not initialized.
*
* TODO: Remove this whole concept of InitialClusterStateCustomSupplier, it's not used anymore
*/
default Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() { return Collections.emptyMap(); }
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
public class ClusterModuleTests extends ModuleTestCase {
private ClusterInfoService clusterInfoService = EmptyClusterInfoService.INSTANCE;
private ClusterService clusterService = new ClusterService(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, Collections.emptyMap());
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null);
static class FakeAllocationDecider extends AllocationDecider {
protected FakeAllocationDecider(Settings settings) {
super(settings);
Expand Down Expand Up @@ -202,57 +202,6 @@ public void testAllocationDeciderOrder() {
}
}

public void testCustomSuppliers() {
Map<String, Supplier<ClusterState.Custom>> customSuppliers = ClusterModule.getClusterStateCustomSuppliers(Collections.emptyList());
assertEquals(3, customSuppliers.size());
assertTrue(customSuppliers.containsKey(SnapshotsInProgress.TYPE));
assertTrue(customSuppliers.containsKey(SnapshotDeletionsInProgress.TYPE));
assertTrue(customSuppliers.containsKey(RestoreInProgress.TYPE));

customSuppliers = ClusterModule.getClusterStateCustomSuppliers(Collections.singletonList(new ClusterPlugin() {
@Override
public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
return Collections.singletonMap("foo", () -> null);
}
}));
assertEquals(4, customSuppliers.size());
assertTrue(customSuppliers.containsKey(SnapshotsInProgress.TYPE));
assertTrue(customSuppliers.containsKey(SnapshotDeletionsInProgress.TYPE));
assertTrue(customSuppliers.containsKey(RestoreInProgress.TYPE));
assertTrue(customSuppliers.containsKey("foo"));

{
// Eclipse Neon 2 didn't compile the plugins definition inside the lambda expression,
// probably due to https://bugs.eclipse.org/bugs/show_bug.cgi?id=511750, which is
// fixed in Eclipse Oxygon. Pulled out the plugins definition to make it work in older versions
List<ClusterPlugin> plugins = Collections.singletonList(new ClusterPlugin() {
@Override
public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
return Collections.singletonMap(SnapshotsInProgress.TYPE, () -> null);
}
});
IllegalStateException ise = expectThrows(IllegalStateException.class,
() -> ClusterModule.getClusterStateCustomSuppliers(plugins));
assertEquals(ise.getMessage(), "custom supplier key [snapshots] is registered more than once");
}
{
List<ClusterPlugin> plugins = Arrays.asList(new ClusterPlugin() {
@Override
public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
return Collections.singletonMap("foo", () -> null);
}
}, new ClusterPlugin() {
@Override
public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
return Collections.singletonMap("foo", () -> null);
}
});
IllegalStateException ise = expectThrows(IllegalStateException.class,
() -> ClusterModule.getClusterStateCustomSuppliers(plugins));
assertEquals(ise.getMessage(), "custom supplier key [foo] is registered more than once");
}
}

public void testPre63CustomsFiltering() {
final String whiteListedClusterCustom = randomFrom(ClusterModule.PRE_6_3_CLUSTER_CUSTOMS_WHITE_LIST);
final String whiteListedMetaDataCustom = randomFrom(ClusterModule.PRE_6_3_METADATA_CUSTOMS_WHITE_LIST);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand All @@ -38,24 +39,30 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.hamcrest.CollectionAssertions;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertIndexTemplateExists;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -279,13 +286,11 @@ public void testIndicesIgnoreUnavailableFalse() throws Exception {
}
}

public void testPrivateCustomsAreExcluded() {
public void testPrivateCustomsAreExcluded() throws Exception {
// ensure that the custom is injected into the cluster state
assertBusy(() -> assertTrue(clusterService().state().customs().containsKey("test")));
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().setCustoms(true).get();
assertFalse(clusterStateResponse.getState().customs().containsKey("test"));
// just to make sure there is something
assertTrue(clusterStateResponse.getState().customs().containsKey(SnapshotDeletionsInProgress.TYPE));
ClusterState state = internalCluster().getInstance(ClusterService.class).state();
assertTrue(state.customs().containsKey("test"));
}

private static class TestCustom extends AbstractNamedDiffable<ClusterState.Custom> implements ClusterState.Custom {
Expand Down Expand Up @@ -333,17 +338,61 @@ public static class PrivateCustomPlugin extends Plugin implements ClusterPlugin

public PrivateCustomPlugin() {}

@Override
public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
return Collections.singletonMap("test", () -> new TestCustom(1));
}

@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, "test", TestCustom::new));
entries.add(new NamedWriteableRegistry.Entry(NamedDiff.class, "test", TestCustom::readDiffFrom));
return entries;
}

private final AtomicBoolean installed = new AtomicBoolean();

@Override
public Collection<Object> createComponents(
final Client client,
final ClusterService clusterService,
final ThreadPool threadPool,
final ResourceWatcherService resourceWatcherService,
final ScriptService scriptService,
final NamedXContentRegistry xContentRegistry,
final Environment environment,
final NodeEnvironment nodeEnvironment,
final NamedWriteableRegistry namedWriteableRegistry) {
clusterService.addListener(event -> {
final ClusterState state = event.state();
if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
return;
}

if (state.nodes().isLocalNodeElectedMaster()) {
if (state.custom("test") == null) {
if (installed.compareAndSet(false, true)) {
clusterService.submitStateUpdateTask("install-metadata-custom", new ClusterStateUpdateTask(Priority.URGENT) {

@Override
public ClusterState execute(ClusterState currentState) {
if (currentState.custom("test") == null) {
final ClusterState.Builder builder = ClusterState.builder(currentState);
builder.putCustom("test", new TestCustom(42));
return builder.build();
} else {
return currentState;
}
}

@Override
public void onFailure(String source, Exception e) {
throw new AssertionError(e);
}

});
}
}
}

});
return Collections.emptyList();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ static class TimedClusterApplierService extends ClusterApplierService {
public volatile Long currentTimeOverride = null;

TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings, clusterSettings, threadPool, () -> ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)));
super(settings, clusterSettings, threadPool);
}

@Override
Expand Down
Loading

0 comments on commit db6e8c7

Please sign in to comment.