Skip to content

Commit

Permalink
Remove static instance for PersistedStateRegistry
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha committed Aug 28, 2023
1 parent 1482c88 commit 3773049
Show file tree
Hide file tree
Showing 11 changed files with 53 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private JoinHelper.JoinAccumulator joinAccumulator;
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
private final NodeHealthService nodeHealthService;
private final PersistedStateRegistry persistedStateRegistry;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand All @@ -202,7 +203,8 @@ public Coordinator(
Random random,
RerouteService rerouteService,
ElectionStrategy electionStrategy,
NodeHealthService nodeHealthService
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry
) {
this.settings = settings;
this.transportService = transportService;
Expand Down Expand Up @@ -287,6 +289,7 @@ public Coordinator(
joinHelper::logLastFailedJoinAttempt
);
this.nodeHealthService = nodeHealthService;
this.persistedStateRegistry = persistedStateRegistry;
this.localNodeCommissioned = true;
}

Expand Down Expand Up @@ -827,7 +830,7 @@ protected void doStart() {
getLocalNode(),
persistedState,
electionStrategy,
PersistedStateRegistry.getPersistedState(PersistedStateType.REMOTE),
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE),
settings
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
*/
public class PersistedStateRegistry {

private static final PersistedStateRegistry INSTANCE = new PersistedStateRegistry();

private PersistedStateRegistry() {}
public PersistedStateRegistry() {}

/**
* Distinct Types PersistedState which can be present on a node
Expand All @@ -34,13 +32,13 @@ public enum PersistedStateType {

private final Map<PersistedStateType, PersistedState> persistedStates = new ConcurrentHashMap<>();

public static void addPersistedState(PersistedStateType persistedStateType, PersistedState persistedState) {
PersistedState existingState = INSTANCE.persistedStates.putIfAbsent(persistedStateType, persistedState);
public void addPersistedState(PersistedStateType persistedStateType, PersistedState persistedState) {
PersistedState existingState = this.persistedStates.putIfAbsent(persistedStateType, persistedState);
assert existingState == null : "should only be set once, but already have " + existingState;
}

public static PersistedState getPersistedState(PersistedStateType persistedStateType) {
return INSTANCE.persistedStates.get(persistedStateType);
public PersistedState getPersistedState(PersistedStateType persistedStateType) {
return this.persistedStates.get(persistedStateType);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.Coordinator;
import org.opensearch.cluster.coordination.ElectionStrategy;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.allocation.AllocationService;
Expand Down Expand Up @@ -129,7 +130,8 @@ public DiscoveryModule(
Path configFile,
GatewayMetaState gatewayMetaState,
RerouteService rerouteService,
NodeHealthService nodeHealthService
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry
) {
final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
Expand Down Expand Up @@ -205,7 +207,8 @@ public DiscoveryModule(
new Random(Randomness.get().nextLong()),
rerouteService,
electionStrategy,
nodeHealthService
nodeHealthService,
persistedStateRegistry
);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
Expand Down
24 changes: 14 additions & 10 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,16 @@ public class GatewayMetaState implements Closeable {
*/
public static final String STALE_STATE_CONFIG_NODE_ID = "STALE_STATE_CONFIG";

private PersistedStateRegistry persistedStateRegistry;

public PersistedState getPersistedState() {
final PersistedState persistedState = PersistedStateRegistry.getPersistedState(PersistedStateType.LOCAL);
final PersistedState persistedState = persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL);
assert persistedState != null : "not started";
return persistedState;
}

public Metadata getMetadata() {
return PersistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).getLastAcceptedState().metadata();
return persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).getLastAcceptedState().metadata();
}

public void start(
Expand All @@ -122,10 +124,12 @@ public void start(
MetadataIndexUpgradeService metadataIndexUpgradeService,
MetadataUpgrader metadataUpgrader,
PersistedClusterStateService persistedClusterStateService,
RemoteClusterStateService remoteClusterStateService
RemoteClusterStateService remoteClusterStateService,
PersistedStateRegistry persistedStateRegistry
) {
assert PersistedStateRegistry.getPersistedState(PersistedStateType.LOCAL) == null : "should only start once, but already have "
+ PersistedStateRegistry.getPersistedState(PersistedStateType.LOCAL);
this.persistedStateRegistry = persistedStateRegistry;
assert persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL) == null : "should only start once, but already have "
+ persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL);

if (DiscoveryNode.isClusterManagerNode(settings) || DiscoveryNode.isDataNode(settings)) {
try {
Expand Down Expand Up @@ -189,9 +193,9 @@ public void start(
}
}

PersistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState);
persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState);
if (remotePersistedState != null) {
PersistedStateRegistry.addPersistedState(PersistedStateType.REMOTE, remotePersistedState);
persistedStateRegistry.addPersistedState(PersistedStateType.REMOTE, remotePersistedState);
}
} catch (IOException e) {
throw new OpenSearchException("failed to load metadata", e);
Expand Down Expand Up @@ -219,7 +223,7 @@ public void start(
throw new UncheckedIOException(e);
}
}
PersistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(currentTerm, clusterState));
persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(currentTerm, clusterState));
}
}

Expand Down Expand Up @@ -338,12 +342,12 @@ public void applyClusterState(ClusterChangedEvent event) {

@Override
public void close() throws IOException {
IOUtils.close(PersistedStateRegistry.getPersistedState(PersistedStateType.LOCAL));
IOUtils.close(persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL));
}

// visible for testing
public boolean allPendingAsyncStatesWritten() {
final PersistedState ps = PersistedStateRegistry.getPersistedState(PersistedStateType.LOCAL);
final PersistedState ps = persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL);
if (ps instanceof AsyncLucenePersistedState) {
return ((AsyncLucenePersistedState) ps).allPendingAsyncStatesWritten();
} else {
Expand Down
11 changes: 8 additions & 3 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,7 @@ protected Node(
client,
identityService
);
final PersistedStateRegistry persistedStateRegistry = new PersistedStateRegistry();
final GatewayMetaState gatewayMetaState = new GatewayMetaState();
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
final SearchTransportService searchTransportService = new SearchTransportService(
Expand Down Expand Up @@ -987,7 +988,8 @@ protected Node(
environment.configDir(),
gatewayMetaState,
rerouteService,
fsHealthService
fsHealthService,
persistedStateRegistry
);
final SearchPipelineService searchPipelineService = new SearchPipelineService(
clusterService,
Expand Down Expand Up @@ -1166,6 +1168,7 @@ protected Node(
b.bind(IdentityService.class).toInstance(identityService);
b.bind(Tracer.class).toInstance(tracer);
b.bind(RemoteClusterStateService.class).toInstance(remoteClusterStateService);
b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry);
});
injector = modules.createInjector();

Expand Down Expand Up @@ -1314,7 +1317,8 @@ public Node start() throws NodeValidationException {
injector.getInstance(MetadataIndexUpgradeService.class),
injector.getInstance(MetadataUpgrader.class),
injector.getInstance(PersistedClusterStateService.class),
injector.getInstance(RemoteClusterStateService.class)
injector.getInstance(RemoteClusterStateService.class),
injector.getInstance(PersistedStateRegistry.class)
);
if (Assertions.ENABLED) {
try {
Expand All @@ -1333,7 +1337,8 @@ public Node start() throws NodeValidationException {
}
// we load the global state here (the persistent part of the cluster state stored on disk) to
// pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
final Metadata onDiskMetadata = PersistedStateRegistry.getPersistedState(PersistedStateType.LOCAL)
final PersistedStateRegistry persistedStateRegistry = injector.getInstance(PersistedStateRegistry.class);
final Metadata onDiskMetadata = persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL)
.getLastAcceptedState()
.metadata();
assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ protected void onSendRequest(
random,
(s, p, r) -> {},
ElectionStrategy.DEFAULT_INSTANCE,
nodeHealthService
nodeHealthService,
new PersistedStateRegistry()
);
transportService.start();
transportService.acceptIncomingRequests();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opensearch.Version;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.Coordinator;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.service.ClusterApplier;
Expand Down Expand Up @@ -120,7 +121,8 @@ private DiscoveryModule newModule(Settings settings, List<DiscoveryPlugin> plugi
createTempDir().toAbsolutePath(),
gatewayMetaState,
mock(RerouteService.class),
null
null,
new PersistedStateRegistry()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion;
import org.opensearch.cluster.coordination.CoordinationState;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Manifest;
import org.opensearch.cluster.metadata.Metadata;
Expand Down Expand Up @@ -450,7 +451,8 @@ public void testDataOnlyNodePersistence() throws Exception {
null,
null,
persistedClusterStateService,
remoteClusterStateService
remoteClusterStateService,
new PersistedStateRegistry()
);
final CoordinationState.PersistedState persistedState = gateway.getPersistedState();
assertThat(persistedState, instanceOf(GatewayMetaState.AsyncLucenePersistedState.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
import org.opensearch.cluster.coordination.ElectionStrategy;
import org.opensearch.cluster.coordination.InMemoryPersistedState;
import org.opensearch.cluster.coordination.MockSinglePrioritizingExecutor;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.cluster.metadata.AliasValidator;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -2506,7 +2507,8 @@ public void start(ClusterState initialState) {
random(),
rerouteService,
ElectionStrategy.DEFAULT_INSTANCE,
() -> new StatusInfo(HEALTHY, "healthy-info")
() -> new StatusInfo(HEALTHY, "healthy-info"),
new PersistedStateRegistry()
);
clusterManagerService.setClusterStatePublisher(coordinator);
coordinator.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,8 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
Randomness.get(),
(s, p, r) -> {},
getElectionStrategy(),
nodeHealthService
nodeHealthService,
new PersistedStateRegistry()
);
clusterManagerService.setClusterStatePublisher(coordinator);
final GatewayService gatewayService = new GatewayService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.gateway;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.cluster.metadata.Manifest;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.MetadataIndexUpgradeService;
Expand Down Expand Up @@ -127,7 +128,8 @@ public void start(Settings settings, NodeEnvironment nodeEnvironment, NamedXCont
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
() -> 0L
)
),
new PersistedStateRegistry()
);
}
}

0 comments on commit 3773049

Please sign in to comment.