Skip to content

Commit

Permalink
Add dedicated string prefix for remote index metadata and remote rout…
Browse files Browse the repository at this point in the history
…ing table (opensearch-project#15575)

* Hashed prefix for index metadata

Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha committed Sep 5, 2024
1 parent 4f50b4d commit 1935650
Show file tree
Hide file tree
Showing 11 changed files with 298 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -48,19 +52,27 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;

@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class RemoteStatePublicationIT extends RemoteStoreBaseIntegTestCase {

private static String INDEX_NAME = "test-index";
private static final String INDEX_NAME = "test-index";
private static final String REMOTE_STATE_PREFIX = "!";
private static final String REMOTE_ROUTING_PREFIX = "_";
private boolean isRemoteStateEnabled = true;
private String isRemotePublicationEnabled = "true";
private boolean hasRemoteStateCharPrefix;
private boolean hasRemoteRoutingCharPrefix;

@Before
public void setup() {
asyncUploadMockFsRepo = false;
isRemoteStateEnabled = true;
isRemotePublicationEnabled = "true";
hasRemoteStateCharPrefix = randomBoolean();
hasRemoteRoutingCharPrefix = randomBoolean();
}

@Override
Expand All @@ -84,6 +96,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
routingTableRepoName
);

return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), isRemoteStateEnabled)
Expand All @@ -94,6 +107,19 @@ protected Settings nodeSettings(int nodeOrdinal) {
RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(),
RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE
)
.put(
RemoteClusterStateService.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.getKey(),
hasRemoteStateCharPrefix ? REMOTE_STATE_PREFIX : ""
)
.put(
RemoteRoutingTableBlobStore.CLUSTER_REMOTE_STORE_ROUTING_TABLE_PATH_PREFIX.getKey(),
hasRemoteRoutingCharPrefix ? REMOTE_ROUTING_PREFIX : ""
)
.put(RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX.toString())
.put(
RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_HASH_ALGO_SETTING.getKey(),
PathHashAlgorithm.FNV_1A_COMPOSITE_1.toString()
)
.build();
}

Expand Down Expand Up @@ -137,6 +163,27 @@ public void testPublication() throws Exception {
Map<String, Integer> manifestFiles = getMetadataFiles(repository, RemoteClusterMetadataManifest.MANIFEST);
assertTrue(manifestFiles.containsKey(RemoteClusterMetadataManifest.MANIFEST));

RemoteClusterStateService remoteClusterStateService = internalCluster().getInstance(
RemoteClusterStateService.class,
internalCluster().getClusterManagerName()
);
ClusterMetadataManifest manifest = remoteClusterStateService.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().metadata().clusterUUID()
).get();
assertThat(manifest.getIndices().size(), is(1));
if (hasRemoteStateCharPrefix) {
for (UploadedIndexMetadata md : manifest.getIndices()) {
assertThat(md.getUploadedFilename(), startsWith(REMOTE_STATE_PREFIX));
}
}
assertThat(manifest.getIndicesRouting().size(), is(1));
if (hasRemoteRoutingCharPrefix) {
for (UploadedIndexMetadata md : manifest.getIndicesRouting()) {
assertThat(md.getUploadedFilename(), startsWith(REMOTE_ROUTING_PREFIX));
}
}

// get settings from each node and verify that it is updated
Settings settings = clusterService().getSettings();
logger.info("settings : {}", settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,17 @@ public void testFullClusterRestoreManifestFilePointsToInvalidIndexMetadataPathTh
internalCluster().stopAllNodes();
// Step - 3 Delete index metadata file in remote
try {
Files.move(
segmentRepoPath.resolve(encodeString(clusterName) + "/cluster-state/" + prevClusterUUID + "/index"),
segmentRepoPath.resolve("cluster-state/")
RemoteClusterStateService remoteClusterStateService = internalCluster().getInstance(
RemoteClusterStateService.class,
internalCluster().getClusterManagerName()
);
ClusterMetadataManifest manifest = remoteClusterStateService.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().metadata().clusterUUID()
).get();
for (UploadedIndexMetadata md : manifest.getIndices()) {
Files.move(segmentRepoPath.resolve(md.getUploadedFilename()), segmentRepoPath.resolve("cluster-state/"));
}
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ public String[] getBlobPathTokens() {

public abstract String generateBlobFileName();

/**
* Generate the blob path for the remote entity by adding a custom prefix.
* This custom prefix may be generated by any of the strategies defined in {@link org.opensearch.index.remote.RemoteStoreEnums}
* The default implementation returns the same path as passed in the argument.
* @param blobPath The remote path on which the remote entity is to be uploaded
* @return The modified remote path after adding a custom prefix at which the remote entity will be uploaded.
*/
public BlobPath getPrefixedPath(BlobPath blobPath) {
return blobPath;
}

public String clusterUUID() {
return clusterUUID;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public BlobPath getBlobPathForUpload(final RemoteWriteableBlobEntity<T> obj) {
for (String token : obj.getBlobPathParameters().getPathTokens()) {
blobPath = blobPath.add(token);
}
return blobPath;
return obj.getPrefixedPath(blobPath);
}

public BlobPath getBlobPathForDownload(final RemoteWriteableBlobEntity<T> obj) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteIndexMetadataManager;
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.index.IndexModule;
Expand Down Expand Up @@ -736,13 +737,17 @@ public void apply(Settings value, Settings current, Settings previous) {
GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.REMOTE_STATE_READ_TIMEOUT_SETTING,
RemoteClusterStateService.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX,
RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_TYPE_SETTING,
RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_HASH_ALGO_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING,
RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING,
RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING,
RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING,
RemoteRoutingTableBlobStore.CLUSTER_REMOTE_STORE_ROUTING_TABLE_PATH_PREFIX,

// Admission Control Settings
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,16 @@ public class RemoteClusterStateService implements Closeable {
Setting.Property.NodeScope
);

/**
* Controls the fixed prefix for the cluster state path on remote store.
*/
public static final Setting<String> CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX = Setting.simpleString(
"cluster.remote_store.state.path.prefix",
"",
Property.NodeScope,
Property.Final
);

/**
* Validation mode for cluster state checksum.
* None: Validation will be disabled.
Expand Down Expand Up @@ -211,6 +221,7 @@ public static RemoteClusterStateValidationMode parseString(String mode) {
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
+ "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]";
private final boolean isPublicationEnabled;
private final String remotePathPrefix;

// ToXContent Params with gateway mode.
// We are using gateway context mode to persist all custom metadata.
Expand Down Expand Up @@ -252,6 +263,7 @@ public RemoteClusterStateService(
this.isPublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL)
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
&& RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings);
this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings);
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(
repositoriesService,
settings,
Expand Down Expand Up @@ -728,7 +740,10 @@ UploadedMetadataResults writeMetadataInParallel(
indexMetadata,
clusterState.metadata().clusterUUID(),
blobStoreRepository.getCompressor(),
blobStoreRepository.getNamedXContentRegistry()
blobStoreRepository.getNamedXContentRegistry(),
remoteIndexMetadataManager.getPathTypeSetting(),
remoteIndexMetadataManager.getPathHashAlgoSetting(),
remotePathPrefix
),
listener
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.model.RemoteIndexMetadata;
import org.opensearch.gateway.remote.model.RemoteReadResult;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -44,11 +45,38 @@ public class RemoteIndexMetadataManager extends AbstractRemoteWritableEntityMana
Setting.Property.Deprecated
);

/**
* This setting is used to set the remote index metadata blob store path type strategy.
*/
public static final Setting<RemoteStoreEnums.PathType> REMOTE_INDEX_METADATA_PATH_TYPE_SETTING = new Setting<>(
"cluster.remote_store.index_metadata.path_type",
RemoteStoreEnums.PathType.HASHED_PREFIX.toString(),
RemoteStoreEnums.PathType::parseString,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* This setting is used to set the remote index metadata blob store path hash algorithm strategy.
* This setting will come to effect if the {@link #REMOTE_INDEX_METADATA_PATH_TYPE_SETTING}
* is either {@code HASHED_PREFIX} or {@code HASHED_INFIX}.
*/
public static final Setting<RemoteStoreEnums.PathHashAlgorithm> REMOTE_INDEX_METADATA_PATH_HASH_ALGO_SETTING = new Setting<>(
"cluster.remote_store.index_metadata.path_hash_algo",
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64.toString(),
RemoteStoreEnums.PathHashAlgorithm::parseString,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private final Compressor compressor;
private final NamedXContentRegistry namedXContentRegistry;

private volatile TimeValue indexMetadataUploadTimeout;

private RemoteStoreEnums.PathType pathType;
private RemoteStoreEnums.PathHashAlgorithm pathHashAlgo;

public RemoteIndexMetadataManager(
ClusterSettings clusterSettings,
String clusterName,
Expand All @@ -70,7 +98,11 @@ public RemoteIndexMetadataManager(
this.namedXContentRegistry = blobStoreRepository.getNamedXContentRegistry();
this.compressor = blobStoreRepository.getCompressor();
this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING);
this.pathType = clusterSettings.get(REMOTE_INDEX_METADATA_PATH_TYPE_SETTING);
this.pathHashAlgo = clusterSettings.get(REMOTE_INDEX_METADATA_PATH_HASH_ALGO_SETTING);
clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(REMOTE_INDEX_METADATA_PATH_TYPE_SETTING, this::setPathTypeSetting);
clusterSettings.addSettingsUpdateConsumer(REMOTE_INDEX_METADATA_PATH_HASH_ALGO_SETTING, this::setPathHashAlgoSetting);
}

/**
Expand Down Expand Up @@ -127,4 +159,20 @@ protected ActionListener<Object> getWrappedReadListener(
ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteEntity, ex))
);
}

private void setPathTypeSetting(RemoteStoreEnums.PathType pathType) {
this.pathType = pathType;
}

private void setPathHashAlgoSetting(RemoteStoreEnums.PathHashAlgorithm pathHashAlgo) {
this.pathHashAlgo = pathHashAlgo;
}

protected RemoteStoreEnums.PathType getPathTypeSetting() {
return pathType;
}

protected RemoteStoreEnums.PathHashAlgorithm getPathHashAlgoSetting() {
return pathHashAlgo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.gateway.remote.model;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.io.Streams;
import org.opensearch.common.remote.AbstractClusterMetadataWriteableBlobEntity;
import org.opensearch.common.remote.BlobPathParameters;
Expand All @@ -17,6 +18,8 @@
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;

Expand All @@ -41,15 +44,24 @@ public class RemoteIndexMetadata extends AbstractClusterMetadataWriteableBlobEnt
public static final String INDEX = "index";

private IndexMetadata indexMetadata;
private RemoteStoreEnums.PathType pathType;
private RemoteStoreEnums.PathHashAlgorithm pathHashAlgo;
private String fixedPrefix;

public RemoteIndexMetadata(
final IndexMetadata indexMetadata,
final String clusterUUID,
final Compressor compressor,
final NamedXContentRegistry namedXContentRegistry
final NamedXContentRegistry namedXContentRegistry,
final RemoteStoreEnums.PathType pathType,
final RemoteStoreEnums.PathHashAlgorithm pathHashAlgo,
final String fixedPrefix
) {
super(clusterUUID, compressor, namedXContentRegistry);
this.indexMetadata = indexMetadata;
this.pathType = pathType;
this.pathHashAlgo = pathHashAlgo;
this.fixedPrefix = fixedPrefix;
}

public RemoteIndexMetadata(
Expand Down Expand Up @@ -86,6 +98,22 @@ public String generateBlobFileName() {
return blobFileName;
}

@Override
public BlobPath getPrefixedPath(BlobPath blobPath) {
if (pathType == null) {
return blobPath;
}
assert pathHashAlgo != null;
return pathType.path(
RemoteStorePathStrategy.PathInput.builder()
.fixedPrefix(fixedPrefix)
.basePath(blobPath)
.indexUUID(indexMetadata.getIndexUUID())
.build(),
pathHashAlgo
);
}

@Override
public UploadedMetadata getUploadedMetadata() {
assert blobName != null;
Expand Down
Loading

0 comments on commit 1935650

Please sign in to comment.