Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add prefix support to hashed prefix & infix path types on remote store #15557

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426))
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))
- Add support for comma-separated list of index names to be used with Snapshot Status API ([#15409](https://github.com/opensearch-project/OpenSearch/pull/15409))
- Add prefix support to hashed prefix & infix path types on remote store ([#15557](https://github.com/opensearch-project/OpenSearch/pull/15557))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.translog.RemoteTranslogStats;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand Down Expand Up @@ -67,14 +68,16 @@ public void testLocalRecoveryRollingRestartAndNodeFailure() throws Exception {
assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0);
}

String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
assertBusy(() -> {
String shardPath = getShardLevelBlobPath(
client(),
indexName,
new BlobPath(),
String.valueOf(shardRouting.getId()),
SEGMENTS,
DATA
DATA,
segmentsPathFixedPrefix
).buildAsString();
Path segmentDataRepoPath = segmentRepoPath.resolve(shardPath);
List<String> segmentsNFilesInRepo = Arrays.stream(FileSystemUtils.files(segmentDataRepoPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,13 +443,15 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {

void assertRemoteSegmentsAndTranslogUploaded(String idx) throws IOException {
Client client = client();
String path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, METADATA).buildAsString();
String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
String path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix).buildAsString();
Path remoteTranslogMetadataPath = Path.of(remoteRepoPath + "/" + path);
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, DATA).buildAsString();
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix).buildAsString();
Path remoteTranslogDataPath = Path.of(remoteRepoPath + "/" + path);
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, METADATA).buildAsString();
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, METADATA, segmentsPathFixedPrefix).buildAsString();
Path segmentMetadataPath = Path.of(remoteRepoPath + "/" + path);
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, DATA).buildAsString();
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, DATA, segmentsPathFixedPrefix).buildAsString();
Path segmentDataPath = Path.of(remoteRepoPath + "/" + path);

try (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,16 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
String shardPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
SEGMENTS,
METADATA,
segmentsPathFixedPrefix
).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
;
IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME);
Expand Down Expand Up @@ -236,7 +245,16 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, false, INDEX_NAME);
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
String shardPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
SEGMENTS,
METADATA,
segmentsPathFixedPrefix
).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
Expand All @@ -247,11 +265,19 @@ public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {
Settings.Builder settings = Settings.builder()
.put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "3");
internalCluster().startNode(settings);

String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
String shardPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
SEGMENTS,
METADATA,
segmentsPathFixedPrefix
).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
Expand All @@ -271,7 +297,16 @@ public void testStaleCommitDeletionWithMinSegmentFiles_Disabled() throws Excepti
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(12, 18);
indexData(numberOfIterations, true, INDEX_NAME);
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
String shardPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
SEGMENTS,
METADATA,
segmentsPathFixedPrefix
).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
;
int actualFileCount = getFileCount(indexPath);
Expand Down Expand Up @@ -604,8 +639,10 @@ public void testFallbackToNodeToNodeSegmentCopy() throws Exception {
indexBulk(INDEX_NAME, 50);
flushAndRefresh(INDEX_NAME);

String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
// 3. Delete data from remote segment store
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, DATA).buildAsString();
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, DATA, segmentsPathFixedPrefix)
.buildAsString();
Path segmentDataPath = Path.of(segmentRepoPath + "/" + shardPath);

try (Stream<Path> files = Files.list(segmentDataPath)) {
Expand Down Expand Up @@ -844,7 +881,16 @@ public void testLocalOnlyTranslogCleanupOnNodeRestart() throws Exception {
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);

String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA).buildAsString();
String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String shardPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
TRANSLOG,
METADATA,
translogPathFixedPrefix
).buildAsString();
Path translogMetaDataPath = Path.of(translogRepoPath + "/" + shardPath);

try (Stream<Path> files = Files.list(translogMetaDataPath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
Expand Down Expand Up @@ -50,7 +51,10 @@ public void testRemoteRefreshRetryOnFailure() throws Exception {

String indexName = response.getShards()[0].getShardRouting().index().getName();
String indexUuid = response.getShards()[0].getShardRouting().index().getUUID();
String shardPath = getShardLevelBlobPath(client(), indexName, new BlobPath(), "0", SEGMENTS, DATA).buildAsString();

String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
String shardPath = getShardLevelBlobPath(client(), indexName, new BlobPath(), "0", SEGMENTS, DATA, segmentsPathFixedPrefix)
.buildAsString();
Path segmentDataRepoPath = location.resolve(shardPath);
String segmentDataLocalPath = String.format(Locale.ROOT, "%s/indices/%s/0/index", response.getShards()[0].getDataPath(), indexUuid);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,13 +324,15 @@ public void testRemoteStoreCleanupForDeletedIndex() throws Exception {

final RepositoriesService repositoriesService = internalCluster().getCurrentClusterManagerNodeInstance(RepositoriesService.class);
final BlobStoreRepository remoteStoreRepository = (BlobStoreRepository) repositoriesService.repository(REMOTE_REPO_NAME);
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
BlobPath shardLevelBlobPath = getShardLevelBlobPath(
client(),
remoteStoreEnabledIndexName,
remoteStoreRepository.basePath(),
"0",
SEGMENTS,
LOCK_FILES
LOCK_FILES,
segmentsPathFixedPrefix
);
BlobContainer blobContainer = remoteStoreRepository.blobStore().blobContainer(shardLevelBlobPath);
String[] lockFiles;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryCleanupResult;
Expand Down Expand Up @@ -109,7 +110,8 @@
SnapshotsService snapshotsService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
RemoteStoreSettings remoteStoreSettings
) {
super(
CleanupRepositoryAction.NAME,
Expand All @@ -122,7 +124,10 @@
);
this.repositoriesService = repositoriesService;
this.snapshotsService = snapshotsService;
this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService);
this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(
() -> repositoriesService,

Check warning on line 128 in server/src/main/java/org/opensearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java#L128

Added line #L128 was not covered by tests
remoteStoreSettings.getSegmentsPathFixedPrefix()
);
// We add a state applier that will remove any dangling repository cleanup actions on cluster-manager failover.
// This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent
// operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,8 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_LOOKBACK_INTERVAL,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX,

SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.remote.RemoteStorePathStrategy.PathInput;
import org.opensearch.index.remote.RemoteStorePathStrategy.ShardDataPathInput;
import org.opensearch.indices.RemoteStoreSettings;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -68,6 +69,7 @@ public class RemoteIndexPath implements ToXContentFragment {
private final Iterable<String> basePath;
private final PathType pathType;
private final PathHashAlgorithm pathHashAlgorithm;
private final RemoteStoreSettings remoteStoreSettings;

/**
* This keeps the map of paths that would be present in the content of the index path file. For eg - It is possible
Expand All @@ -82,7 +84,8 @@ public RemoteIndexPath(
Iterable<String> basePath,
PathType pathType,
PathHashAlgorithm pathHashAlgorithm,
Map<DataCategory, List<DataType>> pathCreationMap
Map<DataCategory, List<DataType>> pathCreationMap,
RemoteStoreSettings remoteStoreSettings
) {
if (Objects.isNull(pathCreationMap)
|| Objects.isNull(pathType)
Expand Down Expand Up @@ -119,6 +122,7 @@ public RemoteIndexPath(
this.pathType = pathType;
this.pathHashAlgorithm = pathHashAlgorithm;
this.pathCreationMap = pathCreationMap;
this.remoteStoreSettings = remoteStoreSettings;
}

@Override
Expand Down Expand Up @@ -148,6 +152,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.shardId(Integer.toString(shardNo))
.dataCategory(dataCategory)
.dataType(type)
.fixedPrefix(
dataCategory == TRANSLOG
? remoteStoreSettings.getTranslogPathFixedPrefix()
: remoteStoreSettings.getSegmentsPathFixedPrefix()
)
.build();
builder.value(pathType.path(pathInput, pathHashAlgorithm).buildAsString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.gateway.remote.IndexMetadataUploadListener;
import org.opensearch.gateway.remote.RemoteStateTransferException;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class RemoteIndexPathUploader extends IndexMetadataUploadListener {
private final Settings settings;
private final boolean isRemoteDataAttributePresent;
private final boolean isTranslogSegmentRepoSame;
private final RemoteStoreSettings remoteStoreSettings;
private final Supplier<RepositoriesService> repositoriesService;
private volatile TimeValue metadataUploadTimeout;

Expand All @@ -89,7 +91,8 @@ public RemoteIndexPathUploader(
ThreadPool threadPool,
Settings settings,
Supplier<RepositoriesService> repositoriesService,
ClusterSettings clusterSettings
ClusterSettings clusterSettings,
RemoteStoreSettings remoteStoreSettings
) {
super(threadPool, ThreadPool.Names.GENERIC);
this.settings = Objects.requireNonNull(settings);
Expand All @@ -100,6 +103,7 @@ public RemoteIndexPathUploader(
Objects.requireNonNull(clusterSettings);
metadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING);
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setMetadataUploadTimeout);
this.remoteStoreSettings = remoteStoreSettings;
}

@Override
Expand Down Expand Up @@ -208,7 +212,8 @@ private void writePathToRemoteStore(
basePath,
pathType,
hashAlgorithm,
pathCreationMap
pathCreationMap,
remoteStoreSettings
);
String fileName = generateFileName(indexUUID, idxMD.getVersion(), remoteIndexPath.getVersion());
REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority(remoteIndexPath, blobContainer, fileName, actionListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.hash.FNV1a;
import org.opensearch.core.common.Strings;
import org.opensearch.index.remote.RemoteStorePathStrategy.PathInput;

import java.util.HashMap;
Expand Down Expand Up @@ -107,7 +108,11 @@ boolean requiresHashAlgorithm() {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
assert Objects.nonNull(hashAlgorithm) : "hashAlgorithm is expected to be non-null";
return BlobPath.cleanPath().add(hashAlgorithm.hash(pathInput)).add(pathInput.basePath()).add(pathInput.fixedSubPath());
String fixedPrefix = pathInput.fixedPrefix();
return BlobPath.cleanPath()
.add(Strings.isNullOrEmpty(fixedPrefix) ? hashAlgorithm.hash(pathInput) : fixedPrefix + hashAlgorithm.hash(pathInput))
.add(pathInput.basePath())
.add(pathInput.fixedSubPath());
}

@Override
Expand All @@ -119,7 +124,10 @@ boolean requiresHashAlgorithm() {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
assert Objects.nonNull(hashAlgorithm) : "hashAlgorithm is expected to be non-null";
return pathInput.basePath().add(hashAlgorithm.hash(pathInput)).add(pathInput.fixedSubPath());
String fixedPrefix = pathInput.fixedPrefix();
return pathInput.basePath()
.add(Strings.isNullOrEmpty(fixedPrefix) ? hashAlgorithm.hash(pathInput) : fixedPrefix + hashAlgorithm.hash(pathInput))
.add(pathInput.fixedSubPath());
}

@Override
Expand Down
Loading
Loading