Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add version details in remote index path file with code enhancements #13386

Merged
merged 4 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179))
- [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992))
- Batch mode for async fetching shard information in GatewayAllocator for unassigned shards ([#8746](https://github.com/opensearch-project/OpenSearch/pull/8746))
- [Remote Store] Add settings for remote path type and hash algorithm ([#13225](https://github.com/opensearch-project/OpenSearch/pull/13225))
- [Remote Store] Upload remote paths during remote enabled index creation ([#13386](https://github.com/opensearch-project/OpenSearch/pull/13386))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.index.remote.RemoteIndexPath;
import org.opensearch.index.remote.RemoteIndexPathUploader;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.util.Locale;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;

import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
Expand Down Expand Up @@ -81,28 +83,29 @@ public void testRemoteIndexPathFileCreation() throws ExecutionException, Interru

}

private void validateRemoteIndexPathFile(boolean exists) {
private void validateRemoteIndexPathFile(boolean exists) throws IOException {
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);

String fileName = generatePartFileName(indexUUID);
assertEquals(exists, FileSystemUtils.exists(translogRepoPath.resolve(RemoteIndexPath.DIR)));
assertEquals(
exists,
FileSystemUtils.exists(
translogRepoPath.resolve(RemoteIndexPath.DIR)
.resolve(String.format(Locale.ROOT, RemoteIndexPath.FILE_NAME_FORMAT, indexUUID))
)
);
assertEquals(exists, FileSystemUtils.exists(segmentRepoPath.resolve(RemoteIndexPath.DIR)));
assertEquals(
exists,
FileSystemUtils.exists(
segmentRepoPath.resolve(RemoteIndexPath.DIR)
.resolve(String.format(Locale.ROOT, RemoteIndexPath.FILE_NAME_FORMAT, indexUUID))
)
);
if (exists) {
Path[] files = FileSystemUtils.files(translogRepoPath.resolve(RemoteIndexPath.DIR));
assertEquals(1, files.length);
assertTrue(Arrays.stream(files).anyMatch(file -> file.toString().contains(fileName)));
String translogPathFile = files[0].toString();
assertTrue(FileSystemUtils.exists(segmentRepoPath.resolve(RemoteIndexPath.DIR)));
files = FileSystemUtils.files(segmentRepoPath.resolve(RemoteIndexPath.DIR));
assertEquals(1, files.length);
assertTrue(Arrays.stream(files).anyMatch(file -> file.toString().contains(fileName)));
String segmentPathFile = files[0].toString();
assertNotEquals(translogPathFile, segmentPathFile);
}
}

private String generatePartFileName(String indexUUID) {
return String.join(RemoteIndexPathUploader.DELIMITER, indexUUID, "2", RemoteIndexPath.DEFAULT_VERSION);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.threadpool.ThreadPool;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;

Expand Down Expand Up @@ -41,9 +42,17 @@ public IndexMetadataUploadListener(ThreadPool threadPool, String threadPoolName)
* @param indexMetadataList list of index metadata of new indexes (or first time index metadata upload).
* @param actionListener listener to be invoked on success or failure.
*/
public final void onNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener) {
executorService.execute(() -> doOnNewIndexUpload(indexMetadataList, actionListener));
public final void onUpload(
List<IndexMetadata> indexMetadataList,
Map<String, IndexMetadata> prevIndexMetadataByName,
ActionListener<Void> actionListener
) {
executorService.execute(() -> doOnUpload(indexMetadataList, prevIndexMetadataByName, actionListener));
}

protected abstract void doOnNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener);
protected abstract void doOnUpload(
List<IndexMetadata> indexMetadataList,
Map<String, IndexMetadata> prevIndexMetadataByName,
ActionListener<Void> actionListener
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri
final List<UploadedIndexMetadata> allUploadedIndexMetadata = writeIndexMetadataParallel(
clusterState,
toUpload,
ClusterState.UNKNOWN_UUID.equals(previousClusterUUID) ? toUpload : Collections.emptyList()
Collections.emptyMap()
);
final ClusterMetadataManifest manifest = uploadManifest(
clusterState,
Expand Down Expand Up @@ -307,9 +307,9 @@ public ClusterMetadataManifest writeIncrementalMetadata(
}

// Write Index Metadata
final Map<String, Long> previousStateIndexMetadataVersionByName = new HashMap<>();
final Map<String, IndexMetadata> previousStateIndexMetadataByName = new HashMap<>();
for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) {
previousStateIndexMetadataVersionByName.put(indexMetadata.getIndex().getName(), indexMetadata.getVersion());
previousStateIndexMetadataByName.put(indexMetadata.getIndex().getName(), indexMetadata);
}

int numIndicesUpdated = 0;
Expand All @@ -319,9 +319,12 @@ public ClusterMetadataManifest writeIncrementalMetadata(
.collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity()));

List<IndexMetadata> toUpload = new ArrayList<>();
List<IndexMetadata> newIndexMetadataList = new ArrayList<>();
// We prepare a map that contains the previous index metadata for the indexes for which version has changed.
Map<String, IndexMetadata> prevIndexMetadataByName = new HashMap<>();
for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
final Long previousVersion = previousStateIndexMetadataVersionByName.get(indexMetadata.getIndex().getName());
String indexName = indexMetadata.getIndex().getName();
final IndexMetadata prevIndexMetadata = previousStateIndexMetadataByName.get(indexName);
Long previousVersion = prevIndexMetadata != null ? prevIndexMetadata.getVersion() : null;
if (previousVersion == null || indexMetadata.getVersion() != previousVersion) {
logger.debug(
"updating metadata for [{}], changing version from [{}] to [{}]",
Expand All @@ -331,22 +334,19 @@ public ClusterMetadataManifest writeIncrementalMetadata(
);
numIndicesUpdated++;
toUpload.add(indexMetadata);
prevIndexMetadataByName.put(indexName, prevIndexMetadata);
} else {
numIndicesUnchanged++;
}
previousStateIndexMetadataVersionByName.remove(indexMetadata.getIndex().getName());
// Adding the indexMetadata to newIndexMetadataList if there is no previous version present for the index.
if (previousVersion == null) {
newIndexMetadataList.add(indexMetadata);
}
previousStateIndexMetadataByName.remove(indexMetadata.getIndex().getName());
}

List<UploadedIndexMetadata> uploadedIndexMetadataList = writeIndexMetadataParallel(clusterState, toUpload, newIndexMetadataList);
List<UploadedIndexMetadata> uploadedIndexMetadataList = writeIndexMetadataParallel(clusterState, toUpload, prevIndexMetadataByName);
uploadedIndexMetadataList.forEach(
uploadedIndexMetadata -> allUploadedIndexMetadata.put(uploadedIndexMetadata.getIndexName(), uploadedIndexMetadata)
);

for (String removedIndexName : previousStateIndexMetadataVersionByName.keySet()) {
for (String removedIndexName : previousStateIndexMetadataByName.keySet()) {
allUploadedIndexMetadata.remove(removedIndexName);
}
final ClusterMetadataManifest manifest = uploadManifest(
Expand Down Expand Up @@ -452,7 +452,7 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException
private List<UploadedIndexMetadata> writeIndexMetadataParallel(
ClusterState clusterState,
List<IndexMetadata> toUpload,
List<IndexMetadata> newIndexMetadataList
Map<String, IndexMetadata> prevIndexMetadataByName
) throws IOException {
assert Objects.nonNull(indexMetadataUploadListeners) : "indexMetadataUploadListeners can not be null";
int latchCount = toUpload.size() + indexMetadataUploadListeners.size();
Expand Down Expand Up @@ -482,7 +482,7 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(
writeIndexMetadataAsync(clusterState, indexMetadata, latchedActionListener);
}

invokeIndexMetadataUploadListeners(newIndexMetadataList, latch, exceptionList);
invokeIndexMetadataUploadListeners(toUpload, prevIndexMetadataByName, latch, exceptionList);

try {
if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
Expand Down Expand Up @@ -527,22 +527,25 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(
* Invokes the index metadata upload listener but does not wait for the execution to complete.
*/
private void invokeIndexMetadataUploadListeners(
List<IndexMetadata> newIndexMetadataList,
List<IndexMetadata> udpatedIndexMetadataList,
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
Map<String, IndexMetadata> prevIndexMetadataByName,
CountDownLatch latch,
List<Exception> exceptionList
) {
for (IndexMetadataUploadListener listener : indexMetadataUploadListeners) {
String listenerName = listener.getClass().getSimpleName();
listener.onNewIndexUpload(
newIndexMetadataList,
getIndexMetadataUploadActionListener(newIndexMetadataList, latch, exceptionList, listenerName)
listener.onUpload(
udpatedIndexMetadataList,
prevIndexMetadataByName,
getIndexMetadataUploadActionListener(udpatedIndexMetadataList, prevIndexMetadataByName, latch, exceptionList, listenerName)
);
}

}

private ActionListener<Void> getIndexMetadataUploadActionListener(
List<IndexMetadata> newIndexMetadataList,
Map<String, IndexMetadata> prevIndexMetadataByName,
CountDownLatch latch,
List<Exception> exceptionList,
String listenerName
Expand All @@ -552,18 +555,20 @@ private ActionListener<Void> getIndexMetadataUploadActionListener(
ActionListener.wrap(
ignored -> logger.trace(
new ParameterizedMessage(
"{} : Invoked listener={} successfully tookTimeNs={}",
"listener={} : Invoked successfully with indexMetadataList={} prevIndexMetadataList={} tookTimeNs={}",
listenerName,
newIndexMetadataList,
prevIndexMetadataByName.values(),
(System.nanoTime() - startTime)
)
),
ex -> {
logger.error(
new ParameterizedMessage(
"{} : Exception during invocation of listener={} tookTimeNs={}",
"listener={} : Exception during invocation with indexMetadataList={} prevIndexMetadataList={} tookTimeNs={}",
listenerName,
newIndexMetadataList,
prevIndexMetadataByName.values(),
(System.nanoTime() - startTime)
),
ex
Expand Down
19 changes: 2 additions & 17 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.index.Index;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.ingest.IngestService;
Expand All @@ -62,8 +61,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -990,7 +987,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
*/
widenIndexSortType = IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(settings).before(V_2_7_0);
assignedOnRemoteNode = RemoteStoreNodeAttribute.isRemoteDataAttributePresent(this.getNodeSettings());
remoteStorePathStrategy = determineRemoteStorePathStrategy();
remoteStorePathStrategy = RemoteStoreUtils.determineRemoteStorePathStrategy(indexMetadata, true);

setEnableFuzzySetForDocId(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING));
setDocIdFuzzySetFalsePositiveProbability(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING));
Expand Down Expand Up @@ -1911,18 +1908,6 @@ public void setDocIdFuzzySetFalsePositiveProbability(double docIdFuzzySetFalsePo
this.docIdFuzzySetFalsePositiveProbability = docIdFuzzySetFalsePositiveProbability;
}

private RemoteStorePathStrategy determineRemoteStorePathStrategy() {
Map<String, String> remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
assert remoteCustomData == null || remoteCustomData.containsKey(PathType.NAME);
if (remoteCustomData != null && remoteCustomData.containsKey(PathType.NAME)) {
PathType pathType = PathType.parseString(remoteCustomData.get(PathType.NAME));
String hashAlgoStr = remoteCustomData.get(PathHashAlgorithm.NAME);
PathHashAlgorithm hashAlgorithm = Objects.nonNull(hashAlgoStr) ? PathHashAlgorithm.parseString(hashAlgoStr) : null;
return new RemoteStorePathStrategy(pathType, hashAlgorithm);
}
return new RemoteStorePathStrategy(PathType.FIXED);
}

public RemoteStorePathStrategy getRemoteStorePathStrategy() {
return remoteStorePathStrategy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,16 @@ public class RemoteIndexPath implements ToXContentFragment {
combinedPath.putAll(SEGMENT_PATH);
COMBINED_PATH = Collections.unmodifiableMap(combinedPath);
}
private static final String DEFAULT_VERSION = "1";
public static final String DEFAULT_VERSION = "1";
public static final String DIR = "remote-index-path";
public static final String FILE_NAME_FORMAT = "remote_path_%s";
static final String KEY_VERSION = "version";
static final String KEY_INDEX_UUID = "index_uuid";
static final String KEY_SHARD_COUNT = "shard_count";
static final String KEY_PATH_CREATION_MAP = "path_creation_map";
static final String KEY_PATHS = "paths";

private final String version;
private final String indexUUID;
private final int shardCount;
private final Iterable<String> basePath;
Expand Down Expand Up @@ -109,6 +111,7 @@ public RemoteIndexPath(
.getFormattedMessage()
);
}
this.version = DEFAULT_VERSION;
this.indexUUID = indexUUID;
this.shardCount = shardCount;
this.basePath = basePath;
Expand All @@ -119,7 +122,7 @@ public RemoteIndexPath(

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(KEY_VERSION, DEFAULT_VERSION);
builder.field(KEY_VERSION, version);
builder.field(KEY_INDEX_UUID, indexUUID);
builder.field(KEY_SHARD_COUNT, shardCount);
builder.field(PathType.NAME, pathType.name());
Expand Down Expand Up @@ -156,4 +159,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public static RemoteIndexPath fromXContent(XContentParser ignored) {
throw new UnsupportedOperationException("RemoteIndexPath.fromXContent() is not supported");
}

String getVersion() {
return version;
}
}
Loading
Loading