Skip to content

Commit

Permalink
Restore cluster metadata during bootstrap (#9831) (#9900)
Browse files Browse the repository at this point in the history
(cherry picked from commit 0bc1d5b)

Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 5769006 commit 5f08608
Show file tree
Hide file tree
Showing 9 changed files with 336 additions and 143 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added crypto-kms plugin to provide AWS KMS based key providers for encryption/decryption. ([#8465](https://github.com/opensearch-project/OpenSearch/pull/8465))
- [Remote state] Integrate remote cluster state in publish/commit flow ([#9665](https://github.com/opensearch-project/OpenSearch/pull/9665))
- [Remote Store] Changes to introduce repository registration during bootstrap via node attributes. ([#9105](https://github.com/opensearch-project/OpenSearch/pull/9105))
- [Remote state] Auto restore index metadata from last known cluster state ([#9831](https://github.com/opensearch-project/OpenSearch/pull/9831))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
42 changes: 26 additions & 16 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.opensearch.env.NodeMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.index.recovery.RemoteStoreRestoreService;
import org.opensearch.index.recovery.RemoteStoreRestoreService.RemoteRestoreResult;
import org.opensearch.node.Node;
import org.opensearch.plugins.MetadataUpgrader;
import org.opensearch.repositories.RepositoryMissingException;
Expand Down Expand Up @@ -127,7 +129,8 @@ public void start(
MetadataUpgrader metadataUpgrader,
PersistedClusterStateService persistedClusterStateService,
RemoteClusterStateService remoteClusterStateService,
PersistedStateRegistry persistedStateRegistry
PersistedStateRegistry persistedStateRegistry,
RemoteStoreRestoreService remoteStoreRestoreService
) {
assert this.persistedStateRegistry == null : "Persisted state registry should only be set once";
this.persistedStateRegistry = persistedStateRegistry;
Expand Down Expand Up @@ -155,7 +158,7 @@ public void start(
PersistedState remotePersistedState = null;
boolean success = false;
try {
final ClusterState clusterState = prepareInitialClusterState(
ClusterState clusterState = prepareInitialClusterState(
transportService,
clusterService,
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
Expand All @@ -165,10 +168,28 @@ public void start(
);

if (DiscoveryNode.isClusterManagerNode(settings)) {
persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState);
if (isRemoteStoreClusterStateEnabled(settings)) {
// If the cluster UUID loaded from local is unknown (_na_) then fetch the best state from remote
// If there is no valid state on remote, continue with initial empty state
// If there is a valid state, then restore index metadata using this state
if (ClusterState.UNKNOWN_UUID.equals(clusterState.metadata().clusterUUID())) {
String lastKnownClusterUUID = remoteClusterStateService.getLastKnownUUIDFromRemote(
clusterState.getClusterName().value()
);
if (!ClusterState.UNKNOWN_UUID.equals(lastKnownClusterUUID)) {
// Load state from remote
final RemoteRestoreResult remoteRestoreResult = remoteStoreRestoreService.restore(
clusterState,
lastKnownClusterUUID,
false,
new String[] {}
);
clusterState = remoteRestoreResult.getClusterState();
}
}
remotePersistedState = new RemotePersistedState(remoteClusterStateService);
}
persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState);
} else {
persistedState = new AsyncLucenePersistedState(
settings,
Expand Down Expand Up @@ -652,12 +673,6 @@ public void setCurrentTerm(long currentTerm) {
@Override
public void setLastAcceptedState(ClusterState clusterState) {
try {
if (lastAcceptedState == null || lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out.
logger.info("Cluster is not yet ready to publish state to remote store");
lastAcceptedState = clusterState;
return;
}
final ClusterMetadataManifest manifest;
if (shouldWriteFullClusterState(clusterState)) {
manifest = remoteClusterStateService.writeFullMetadata(clusterState);
Expand Down Expand Up @@ -707,13 +722,8 @@ private boolean shouldWriteFullClusterState(ClusterState clusterState) {
@Override
public void markLastAcceptedStateAsCommitted() {
try {
if (lastAcceptedState == null
|| lastAcceptedManifest == null
|| lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out.
logger.trace("Cluster is not yet ready to publish state to remote store");
return;
}
assert lastAcceptedState != null : "Last accepted state is not present";
assert lastAcceptedManifest != null : "Last accepted manifest is not present";
final ClusterMetadataManifest committedManifest = remoteClusterStateService.markLastStateAsCommitted(
lastAcceptedState,
lastAcceptedManifest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -136,14 +137,20 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState) thro
logger.error("Local node is not elected cluster manager. Exiting");
return null;
}
ensureRepositorySet();

// should fetch the previous cluster UUID before writing full cluster state.
// Whenever a new election happens, a new leader will be elected and it might have stale previous UUID
final String previousClusterUUID = fetchPreviousClusterUUID(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
);

// any validations before/after upload ?
final List<UploadedIndexMetadata> allUploadedIndexMetadata = writeIndexMetadataParallel(
clusterState,
new ArrayList<>(clusterState.metadata().indices().values())
);
final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, false);
final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, previousClusterUUID, false);
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
logger.warn(
Expand Down Expand Up @@ -220,7 +227,12 @@ public ClusterMetadataManifest writeIncrementalMetadata(
for (String removedIndexName : previousStateIndexMetadataVersionByName.keySet()) {
allUploadedIndexMetadata.remove(removedIndexName);
}
final ClusterMetadataManifest manifest = uploadManifest(clusterState, new ArrayList<>(allUploadedIndexMetadata.values()), false);
final ClusterMetadataManifest manifest = uploadManifest(
clusterState,
new ArrayList<>(allUploadedIndexMetadata.values()),
previousManifest.getPreviousClusterUUID(),
false
);
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
logger.warn(
Expand Down Expand Up @@ -333,13 +345,13 @@ private void writeIndexMetadataAsync(
clusterState.metadata().clusterUUID(),
indexMetadata.getIndexUUID()
);

final String indexMetadataFilename = indexMetadataFileName(indexMetadata);
ActionListener<Void> completionListener = ActionListener.wrap(
resp -> latchedActionListener.onResponse(
new UploadedIndexMetadata(
indexMetadata.getIndex().getName(),
indexMetadata.getIndexUUID(),
indexMetadataContainer.path().buildAsString() + indexMetadataFileName(indexMetadata)
indexMetadataContainer.path().buildAsString() + indexMetadataFilename
)
),
ex -> latchedActionListener.onFailure(new IndexMetadataTransferException(indexMetadata.getIndex().toString(), ex))
Expand All @@ -348,7 +360,7 @@ private void writeIndexMetadataAsync(
INDEX_METADATA_FORMAT.writeAsync(
indexMetadata,
indexMetadataContainer,
indexMetadataFileName(indexMetadata),
indexMetadataFilename,
blobStoreRepository.getCompressor(),
completionListener
);
Expand All @@ -363,7 +375,7 @@ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterStat
}
assert clusterState != null : "Last accepted cluster state is not set";
assert previousManifest != null : "Last cluster metadata manifest is not set";
return uploadManifest(clusterState, previousManifest.getIndices(), true);
return uploadManifest(clusterState, previousManifest.getIndices(), previousManifest.getPreviousClusterUUID(), true);
}

@Override
Expand All @@ -373,12 +385,8 @@ public void close() throws IOException {
}
}

// Visible for testing
void ensureRepositorySet() {
if (blobStoreRepository != null) {
return;
}
assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled";
public void start() {
assert isRemoteStoreClusterStateEnabled(settings) == true : "Remote cluster state is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY
);
Expand All @@ -391,6 +399,7 @@ void ensureRepositorySet() {
private ClusterMetadataManifest uploadManifest(
ClusterState clusterState,
List<UploadedIndexMetadata> uploadedIndexMetadata,
String previousClusterUUID,
boolean committed
) throws IOException {
synchronized (this) {
Expand All @@ -404,8 +413,7 @@ private ClusterMetadataManifest uploadManifest(
nodeId,
committed,
uploadedIndexMetadata,
// todo Change this to proper cluster UUID
ClusterState.UNKNOWN_UUID
previousClusterUUID
);
writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName);
return manifest;
Expand All @@ -418,6 +426,16 @@ private void writeMetadataManifest(String clusterName, String clusterUUID, Clust
CLUSTER_METADATA_MANIFEST_FORMAT.write(uploadManifest, metadataManifestContainer, fileName, blobStoreRepository.getCompressor());
}

private String fetchPreviousClusterUUID(String clusterName, String clusterUUID) {
final Optional<ClusterMetadataManifest> latestManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
if (!latestManifest.isPresent()) {
final String previousClusterUUID = getLastKnownUUIDFromRemote(clusterName);
assert !clusterUUID.equals(previousClusterUUID) : "Last cluster UUID is same current cluster UUID";
return previousClusterUUID;
}
return latestManifest.get().getPreviousClusterUUID();
}

private BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX
return blobStoreRepository.blobStore()
Expand Down Expand Up @@ -484,12 +502,15 @@ private static String indexMetadataFileName(IndexMetadata indexMetadata) {
* @return {@code Map<String, IndexMetadata>} latest IndexUUID to IndexMetadata map
*/
public Map<String, IndexMetadata> getLatestIndexMetadata(String clusterName, String clusterUUID) throws IOException {
ensureRepositorySet();
start();
Map<String, IndexMetadata> remoteIndexMetadata = new HashMap<>();
ClusterMetadataManifest clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
assert Objects.equals(clusterUUID, clusterMetadataManifest.getClusterUUID())
Optional<ClusterMetadataManifest> clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
if (!clusterMetadataManifest.isPresent()) {
throw new IllegalStateException("Latest index metadata is not present for the provided clusterUUID");
}
assert Objects.equals(clusterUUID, clusterMetadataManifest.get().getClusterUUID())
: "Corrupt ClusterMetadataManifest found. Cluster UUID mismatch.";
for (UploadedIndexMetadata uploadedIndexMetadata : clusterMetadataManifest.getIndices()) {
for (UploadedIndexMetadata uploadedIndexMetadata : clusterMetadataManifest.get().getIndices()) {
IndexMetadata indexMetadata = getIndexMetadata(clusterName, clusterUUID, uploadedIndexMetadata);
remoteIndexMetadata.put(uploadedIndexMetadata.getIndexUUID(), indexMetadata);
}
Expand Down Expand Up @@ -527,9 +548,12 @@ private IndexMetadata getIndexMetadata(String clusterName, String clusterUUID, U
* @param clusterName name of the cluster
* @return ClusterMetadataManifest
*/
public ClusterMetadataManifest getLatestClusterMetadataManifest(String clusterName, String clusterUUID) {
String latestManifestFileName = getLatestManifestFileName(clusterName, clusterUUID);
return fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, latestManifestFileName);
public Optional<ClusterMetadataManifest> getLatestClusterMetadataManifest(String clusterName, String clusterUUID) {
Optional<String> latestManifestFileName = getLatestManifestFileName(clusterName, clusterUUID);
if (latestManifestFileName.isPresent()) {
return Optional.of(fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, latestManifestFileName.get()));
}
return Optional.empty();
}

/**
Expand All @@ -538,7 +562,7 @@ public ClusterMetadataManifest getLatestClusterMetadataManifest(String clusterNa
* @param clusterName The cluster name for which previous cluster UUID is to be fetched
* @return Last valid cluster UUID
*/
public String getLatestClusterUUID(String clusterName) {
public String getLastKnownUUIDFromRemote(String clusterName) {
try {
Set<String> clusterUUIDs = getAllClusterUUIDs(clusterName);
Map<String, ClusterMetadataManifest> latestManifests = getLatestManifestForAllClusterUUIDs(clusterName, clusterUUIDs);
Expand Down Expand Up @@ -566,8 +590,8 @@ private Map<String, ClusterMetadataManifest> getLatestManifestForAllClusterUUIDs
Map<String, ClusterMetadataManifest> manifestsByClusterUUID = new HashMap<>();
for (String clusterUUID : clusterUUIDs) {
try {
ClusterMetadataManifest manifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
manifestsByClusterUUID.put(clusterUUID, manifest);
Optional<ClusterMetadataManifest> manifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
manifestsByClusterUUID.put(clusterUUID, manifest.get());
} catch (Exception e) {
throw new IllegalStateException(
String.format(Locale.ROOT, "Exception in fetching manifest for clusterUUID: %s", clusterUUID)
Expand Down Expand Up @@ -627,7 +651,7 @@ private boolean isInvalidClusterUUID(ClusterMetadataManifest manifest) {
* @param clusterName name of the cluster
* @return latest ClusterMetadataManifest filename
*/
private String getLatestManifestFileName(String clusterName, String clusterUUID) throws IllegalStateException {
private Optional<String> getLatestManifestFileName(String clusterName, String clusterUUID) throws IllegalStateException {
try {
/**
* {@link BlobContainer#listBlobsByPrefixInSortedOrder} will get the latest manifest file
Expand All @@ -640,13 +664,13 @@ private String getLatestManifestFileName(String clusterName, String clusterUUID)
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
);
if (manifestFilesMetadata != null && !manifestFilesMetadata.isEmpty()) {
return manifestFilesMetadata.get(0).name();
return Optional.of(manifestFilesMetadata.get(0).name());
}
} catch (IOException e) {
throw new IllegalStateException("Error while fetching latest manifest file for remote cluster state", e);
}

throw new IllegalStateException(String.format(Locale.ROOT, "Remote Cluster State not found - %s", clusterUUID));
logger.info("No manifest file present in remote store for cluster name: {}, cluster UUID: {}", clusterName, clusterUUID);
return Optional.empty();
}

/**
Expand Down
7 changes: 6 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1336,6 +1336,10 @@ public Node start() throws NodeValidationException {
injector.getInstance(PeerRecoverySourceService.class).start();
injector.getInstance(SegmentReplicationSourceService.class).start();

final RemoteClusterStateService remoteClusterStateService = injector.getInstance(RemoteClusterStateService.class);
if (remoteClusterStateService != null) {
remoteClusterStateService.start();
}
// Load (and maybe upgrade) the metadata stored on disk
final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
gatewayMetaState.start(
Expand All @@ -1347,7 +1351,8 @@ public Node start() throws NodeValidationException {
injector.getInstance(MetadataUpgrader.class),
injector.getInstance(PersistedClusterStateService.class),
injector.getInstance(RemoteClusterStateService.class),
injector.getInstance(PersistedStateRegistry.class)
injector.getInstance(PersistedStateRegistry.class),
injector.getInstance(RemoteStoreRestoreService.class)
);
if (Assertions.ENABLED) {
try {
Expand Down
Loading

0 comments on commit 5f08608

Please sign in to comment.