Skip to content

Commit

Permalink
Upload all index metadata to remote
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha committed Aug 25, 2023
1 parent a4024e7 commit be36097
Show file tree
Hide file tree
Showing 8 changed files with 1,173 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.opensearch.gateway.DanglingIndicesState;
import org.opensearch.gateway.GatewayService;
import org.opensearch.gateway.PersistedClusterStateService;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
Expand Down Expand Up @@ -660,7 +661,11 @@ public void apply(Settings value, Settings current, Settings previous) {

// Related to monitoring of task cancellation
TaskCancellationMonitoringSettings.IS_ENABLED_SETTING,
TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING
TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING,

// Remote cluster state settings
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING
)
)
);
Expand Down
121 changes: 112 additions & 9 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.env.NodeMetadata;
import org.opensearch.gateway.remote.ClusterMetadataMarker;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.Node;
import org.opensearch.plugins.MetadataUpgrader;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -84,19 +86,19 @@
/**
* Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts.
*
* When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that
* the state being loaded when constructing the instance of this class is not necessarily the state that will be used as {@link
* ClusterState#metadata()} because it might be stale or incomplete. Cluster-manager-eligible nodes must perform an election to find a complete and
* non-stale state, and cluster-manager-ineligible nodes receive the real cluster state from the elected cluster-manager after joining the cluster.
* When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that the state being
* loaded when constructing the instance of this class is not necessarily the state that will be used as {@link ClusterState#metadata()} because it might be
* stale or incomplete. Cluster-manager-eligible nodes must perform an election to find a complete and non-stale state, and cluster-manager-ineligible nodes
* receive the real cluster state from the elected cluster-manager after joining the cluster.
*
* @opensearch.internal
*/
public class GatewayMetaState implements Closeable {

/**
* Fake node ID for a voting configuration written by a cluster-manager-ineligible data node to indicate that its on-disk state is potentially
* stale (since it is written asynchronously after application, rather than before acceptance). This node ID means that if the node is
* restarted as a cluster-manager-eligible node then it does not win any elections until it has received a fresh cluster state.
* Fake node ID for a voting configuration written by a cluster-manager-ineligible data node to indicate that its on-disk state is potentially stale (since
* it is written asynchronously after application, rather than before acceptance). This node ID means that if the node is restarted as a
* cluster-manager-eligible node then it does not win any elections until it has received a fresh cluster state.
*/
public static final String STALE_STATE_CONFIG_NODE_ID = "STALE_STATE_CONFIG";

Expand Down Expand Up @@ -234,8 +236,8 @@ Metadata upgradeMetadataForNode(
}

/**
* This method calls {@link MetadataIndexUpgradeService} to makes sure that indices are compatible with the current
* version. The MetadataIndexUpgradeService might also update obsolete settings if needed.
* This method calls {@link MetadataIndexUpgradeService} to makes sure that indices are compatible with the current version. The MetadataIndexUpgradeService
* might also update obsolete settings if needed.
*
* @return input <code>metadata</code> if no upgrade is needed or an upgraded metadata
*/
Expand Down Expand Up @@ -599,4 +601,105 @@ public void close() throws IOException {
IOUtils.close(persistenceWriter.getAndSet(null));
}
}

/**
* Encapsulates the writing of metadata to a remote store using {@link RemoteClusterStateService}.
*/
public static class RemotePersistedState implements PersistedState {

private ClusterState lastAcceptedState;
private ClusterMetadataMarker lastAcceptedMarker;
private final RemoteClusterStateService remoteClusterStateService;

public RemotePersistedState(final RemoteClusterStateService remoteClusterStateService) {
this.remoteClusterStateService = remoteClusterStateService;
}

@Override
public long getCurrentTerm() {
return lastAcceptedState != null ? lastAcceptedState.term() : 0L;
}

@Override
public ClusterState getLastAcceptedState() {
if (lastAcceptedMarker != null) {
assert lastAcceptedState != null : "Last accepted state is not set";
assert lastAcceptedState.metadata().indices().size() == lastAcceptedMarker.getIndices().size()
: "Number of indices in last accepted state and marker are different";
lastAcceptedMarker.getIndices().stream().forEach(md -> {
assert lastAcceptedState.metadata().indices().containsKey(md.getIndexName())
: "Last accepted state and marker are not in sync";
assert lastAcceptedState.metadata().indices().get(md.getIndexName()).getIndexUUID().equals(md.getIndexUUID())
: "Last accepted state and marker are not in sync";
});
}
return lastAcceptedState;
}

@Override
public void setCurrentTerm(long currentTerm) {
// no-op
// For LucenePersistedState, setCurrentTerm is used only while handling StartJoinRequest by all follower nodes.
// But for RemotePersistedState, the state is only pushed by the active cluster. So this method is not required.
}

@Override
public void setLastAcceptedState(ClusterState clusterState) {
try {
if (lastAcceptedState == null || lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out.
lastAcceptedState = clusterState;
return;
}
final ClusterMetadataMarker marker;
if (shouldWriteFullClusterState(clusterState)) {
marker = remoteClusterStateService.writeFullMetadata(clusterState);
} else {
marker = remoteClusterStateService.writeIncrementalMetadata(lastAcceptedState, clusterState, lastAcceptedMarker);
}
lastAcceptedMarker = marker;
lastAcceptedState = clusterState;
} catch (Exception e) {
handleExceptionOnWrite(e);
}
}

private boolean shouldWriteFullClusterState(ClusterState clusterState) {
if (lastAcceptedState == null
|| lastAcceptedMarker == null
|| lastAcceptedState.term() != clusterState.term()
|| lastAcceptedMarker.getOpensearchVersion() != Version.CURRENT) {
return true;
}
return false;
}

@Override
public void markLastAcceptedStateAsCommitted() {
try {
if (lastAcceptedState == null
|| lastAcceptedMarker == null
|| lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out.
return;
}
final ClusterMetadataMarker committedMarker = remoteClusterStateService.markLastStateAsCommitted(
lastAcceptedState,
lastAcceptedMarker
);
lastAcceptedMarker = committedMarker;
} catch (Exception e) {
handleExceptionOnWrite(e);
}
}

@Override
public void close() throws IOException {
PersistedState.super.close();
}

private void handleExceptionOnWrite(Exception e) {
throw ExceptionsHelper.convertToRuntime(e);
}
}
}
Loading

0 comments on commit be36097

Please sign in to comment.