From 37730497fd7de8cbf66531c8d22b851a327adffc Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Mon, 28 Aug 2023 11:42:30 +0530 Subject: [PATCH] Remove static instance for PersistedStateRegistry Signed-off-by: Sooraj Sinha --- .../cluster/coordination/Coordinator.java | 7 ++++-- .../coordination/PersistedStateRegistry.java | 12 ++++------ .../opensearch/discovery/DiscoveryModule.java | 7 ++++-- .../opensearch/gateway/GatewayMetaState.java | 24 +++++++++++-------- .../main/java/org/opensearch/node/Node.java | 11 ++++++--- .../cluster/coordination/NodeJoinTests.java | 3 ++- .../discovery/DiscoveryModuleTests.java | 4 +++- .../GatewayMetaStatePersistedStateTests.java | 4 +++- .../snapshots/SnapshotResiliencyTests.java | 4 +++- .../AbstractCoordinatorTestCase.java | 3 ++- .../gateway/MockGatewayMetaState.java | 4 +++- 11 files changed, 53 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index abb55cbf9e936..049b60674929c 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -182,6 +182,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private JoinHelper.JoinAccumulator joinAccumulator; private Optional currentPublication = Optional.empty(); private final NodeHealthService nodeHealthService; + private final PersistedStateRegistry persistedStateRegistry; /** * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. @@ -202,7 +203,8 @@ public Coordinator( Random random, RerouteService rerouteService, ElectionStrategy electionStrategy, - NodeHealthService nodeHealthService + NodeHealthService nodeHealthService, + PersistedStateRegistry persistedStateRegistry ) { this.settings = settings; this.transportService = transportService; @@ -287,6 +289,7 @@ public Coordinator( joinHelper::logLastFailedJoinAttempt ); this.nodeHealthService = nodeHealthService; + this.persistedStateRegistry = persistedStateRegistry; this.localNodeCommissioned = true; } @@ -827,7 +830,7 @@ protected void doStart() { getLocalNode(), persistedState, electionStrategy, - PersistedStateRegistry.getPersistedState(PersistedStateType.REMOTE), + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE), settings ) ); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PersistedStateRegistry.java b/server/src/main/java/org/opensearch/cluster/coordination/PersistedStateRegistry.java index 5b1a23ac7b56b..b214c33c87964 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PersistedStateRegistry.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PersistedStateRegistry.java @@ -20,9 +20,7 @@ */ public class PersistedStateRegistry { - private static final PersistedStateRegistry INSTANCE = new PersistedStateRegistry(); - - private PersistedStateRegistry() {} + public PersistedStateRegistry() {} /** * Distinct Types PersistedState which can be present on a node @@ -34,13 +32,13 @@ public enum PersistedStateType { private final Map persistedStates = new ConcurrentHashMap<>(); - public static void addPersistedState(PersistedStateType persistedStateType, PersistedState persistedState) { - PersistedState existingState = INSTANCE.persistedStates.putIfAbsent(persistedStateType, persistedState); + public void addPersistedState(PersistedStateType persistedStateType, PersistedState persistedState) { + PersistedState existingState = this.persistedStates.putIfAbsent(persistedStateType, persistedState); assert existingState == null : "should only be set once, but already have " + existingState; } - public static PersistedState getPersistedState(PersistedStateType persistedStateType) { - return INSTANCE.persistedStates.get(persistedStateType); + public PersistedState getPersistedState(PersistedStateType persistedStateType) { + return this.persistedStates.get(persistedStateType); } } diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java index 68fce4d9b9bb4..58d8fe2e17fcf 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java @@ -37,6 +37,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.Coordinator; import org.opensearch.cluster.coordination.ElectionStrategy; +import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.RerouteService; import org.opensearch.cluster.routing.allocation.AllocationService; @@ -129,7 +130,8 @@ public DiscoveryModule( Path configFile, GatewayMetaState gatewayMetaState, RerouteService rerouteService, - NodeHealthService nodeHealthService + NodeHealthService nodeHealthService, + PersistedStateRegistry persistedStateRegistry ) { final Collection> joinValidators = new ArrayList<>(); final Map> hostProviders = new HashMap<>(); @@ -205,7 +207,8 @@ public DiscoveryModule( new Random(Randomness.get().nextLong()), rerouteService, electionStrategy, - nodeHealthService + nodeHealthService, + persistedStateRegistry ); } else { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index d55ac1d08ca35..dd6df5a210071 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -104,14 +104,16 @@ public class GatewayMetaState implements Closeable { */ public static final String STALE_STATE_CONFIG_NODE_ID = "STALE_STATE_CONFIG"; + private PersistedStateRegistry persistedStateRegistry; + public PersistedState getPersistedState() { - final PersistedState persistedState = PersistedStateRegistry.getPersistedState(PersistedStateType.LOCAL); + final PersistedState persistedState = persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL); assert persistedState != null : "not started"; return persistedState; } public Metadata getMetadata() { - return PersistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).getLastAcceptedState().metadata(); + return persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).getLastAcceptedState().metadata(); } public void start( @@ -122,10 +124,12 @@ public void start( MetadataIndexUpgradeService metadataIndexUpgradeService, MetadataUpgrader metadataUpgrader, PersistedClusterStateService persistedClusterStateService, - RemoteClusterStateService remoteClusterStateService + RemoteClusterStateService remoteClusterStateService, + PersistedStateRegistry persistedStateRegistry ) { - assert PersistedStateRegistry.getPersistedState(PersistedStateType.LOCAL) == null : "should only start once, but already have " - + PersistedStateRegistry.getPersistedState(PersistedStateType.LOCAL); + this.persistedStateRegistry = persistedStateRegistry; + assert persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL) == null : "should only start once, but already have " + + persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL); if (DiscoveryNode.isClusterManagerNode(settings) || DiscoveryNode.isDataNode(settings)) { try { @@ -189,9 +193,9 @@ public void start( } } - PersistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState); if (remotePersistedState != null) { - PersistedStateRegistry.addPersistedState(PersistedStateType.REMOTE, remotePersistedState); + persistedStateRegistry.addPersistedState(PersistedStateType.REMOTE, remotePersistedState); } } catch (IOException e) { throw new OpenSearchException("failed to load metadata", e); @@ -219,7 +223,7 @@ public void start( throw new UncheckedIOException(e); } } - PersistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(currentTerm, clusterState)); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(currentTerm, clusterState)); } } @@ -338,12 +342,12 @@ public void applyClusterState(ClusterChangedEvent event) { @Override public void close() throws IOException { - IOUtils.close(PersistedStateRegistry.getPersistedState(PersistedStateType.LOCAL)); + IOUtils.close(persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL)); } // visible for testing public boolean allPendingAsyncStatesWritten() { - final PersistedState ps = PersistedStateRegistry.getPersistedState(PersistedStateType.LOCAL); + final PersistedState ps = persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL); if (ps instanceof AsyncLucenePersistedState) { return ((AsyncLucenePersistedState) ps).allPendingAsyncStatesWritten(); } else { diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 8df36ad2b5a2e..a6abac3fa6b06 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -884,6 +884,7 @@ protected Node( client, identityService ); + final PersistedStateRegistry persistedStateRegistry = new PersistedStateRegistry(); final GatewayMetaState gatewayMetaState = new GatewayMetaState(); final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService); final SearchTransportService searchTransportService = new SearchTransportService( @@ -987,7 +988,8 @@ protected Node( environment.configDir(), gatewayMetaState, rerouteService, - fsHealthService + fsHealthService, + persistedStateRegistry ); final SearchPipelineService searchPipelineService = new SearchPipelineService( clusterService, @@ -1166,6 +1168,7 @@ protected Node( b.bind(IdentityService.class).toInstance(identityService); b.bind(Tracer.class).toInstance(tracer); b.bind(RemoteClusterStateService.class).toInstance(remoteClusterStateService); + b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); }); injector = modules.createInjector(); @@ -1314,7 +1317,8 @@ public Node start() throws NodeValidationException { injector.getInstance(MetadataIndexUpgradeService.class), injector.getInstance(MetadataUpgrader.class), injector.getInstance(PersistedClusterStateService.class), - injector.getInstance(RemoteClusterStateService.class) + injector.getInstance(RemoteClusterStateService.class), + injector.getInstance(PersistedStateRegistry.class) ); if (Assertions.ENABLED) { try { @@ -1333,7 +1337,8 @@ public Node start() throws NodeValidationException { } // we load the global state here (the persistent part of the cluster state stored on disk) to // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state. - final Metadata onDiskMetadata = PersistedStateRegistry.getPersistedState(PersistedStateType.LOCAL) + final PersistedStateRegistry persistedStateRegistry = injector.getInstance(PersistedStateRegistry.class); + final Metadata onDiskMetadata = persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL) .getLastAcceptedState() .metadata(); assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index ab91099cae11f..13577ae3c17f9 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -260,7 +260,8 @@ protected void onSendRequest( random, (s, p, r) -> {}, ElectionStrategy.DEFAULT_INSTANCE, - nodeHealthService + nodeHealthService, + new PersistedStateRegistry() ); transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java index 4a987c9a6fe02..b32dd7c6c240b 100644 --- a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java @@ -34,6 +34,7 @@ import org.opensearch.Version; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.Coordinator; +import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.RerouteService; import org.opensearch.cluster.service.ClusterApplier; @@ -120,7 +121,8 @@ private DiscoveryModule newModule(Settings settings, List plugi createTempDir().toAbsolutePath(), gatewayMetaState, mock(RerouteService.class), - null + null, + new PersistedStateRegistry() ); } diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index e225bd3f422ee..0b28eaa5381df 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -41,6 +41,7 @@ import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion; import org.opensearch.cluster.coordination.CoordinationState; +import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Manifest; import org.opensearch.cluster.metadata.Metadata; @@ -450,7 +451,8 @@ public void testDataOnlyNodePersistence() throws Exception { null, null, persistedClusterStateService, - remoteClusterStateService + remoteClusterStateService, + new PersistedStateRegistry() ); final CoordinationState.PersistedState persistedState = gateway.getPersistedState(); assertThat(persistedState, instanceOf(GatewayMetaState.AsyncLucenePersistedState.class)); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 4f7697660096e..6760d8f289096 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -130,6 +130,7 @@ import org.opensearch.cluster.coordination.ElectionStrategy; import org.opensearch.cluster.coordination.InMemoryPersistedState; import org.opensearch.cluster.coordination.MockSinglePrioritizingExecutor; +import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.metadata.AliasValidator; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; @@ -2506,7 +2507,8 @@ public void start(ClusterState initialState) { random(), rerouteService, ElectionStrategy.DEFAULT_INSTANCE, - () -> new StatusInfo(HEALTHY, "healthy-info") + () -> new StatusInfo(HEALTHY, "healthy-info"), + new PersistedStateRegistry() ); clusterManagerService.setClusterStatePublisher(coordinator); coordinator.start(); diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index 8fac407547a9d..331e0b877f84e 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -1144,7 +1144,8 @@ protected Optional getDisruptableMockTransport(Transpo Randomness.get(), (s, p, r) -> {}, getElectionStrategy(), - nodeHealthService + nodeHealthService, + new PersistedStateRegistry() ); clusterManagerService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService( diff --git a/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java b/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java index 10940e775ac8d..76674292c0d03 100644 --- a/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java +++ b/test/framework/src/main/java/org/opensearch/gateway/MockGatewayMetaState.java @@ -33,6 +33,7 @@ package org.opensearch.gateway; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.metadata.Manifest; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.MetadataIndexUpgradeService; @@ -127,7 +128,8 @@ public void start(Settings settings, NodeEnvironment nodeEnvironment, NamedXCont settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L - ) + ), + new PersistedStateRegistry() ); } }