Skip to content

Commit

Permalink
Add version details in remote index path file with code enhancements
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Apr 25, 2024
1 parent a8008e2 commit 1fb08f0
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 88 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))
- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179))
- [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992))
- [Remote Store] Add settings for remote path type and hash algorithm ([#13225](https://github.com/opensearch-project/OpenSearch/pull/13225))
- [Remote Store] Upload remote paths during remote enabled index creation ([#13386](https://github.com/opensearch-project/OpenSearch/pull/13386))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.index.remote.RemoteIndexPath;
import org.opensearch.index.remote.RemoteIndexPathUploader;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand Down Expand Up @@ -87,21 +88,20 @@ private void validateRemoteIndexPathFile(boolean exists) {
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);

String fileName = RemoteIndexPathUploader.generateFileName(indexUUID, 2L, RemoteIndexPath.DEFAULT_VERSION);
assertEquals(exists, FileSystemUtils.exists(translogRepoPath.resolve(RemoteIndexPath.DIR)));
assertEquals(
exists,
FileSystemUtils.exists(
translogRepoPath.resolve(RemoteIndexPath.DIR)
.resolve(String.format(Locale.ROOT, RemoteIndexPath.FILE_NAME_FORMAT, indexUUID))
.resolve(String.format(Locale.ROOT, RemoteIndexPath.FILE_NAME_FORMAT, fileName))
)
);
assertEquals(exists, FileSystemUtils.exists(segmentRepoPath.resolve(RemoteIndexPath.DIR)));
assertEquals(
exists,
FileSystemUtils.exists(
segmentRepoPath.resolve(RemoteIndexPath.DIR)
.resolve(String.format(Locale.ROOT, RemoteIndexPath.FILE_NAME_FORMAT, indexUUID))
segmentRepoPath.resolve(RemoteIndexPath.DIR).resolve(String.format(Locale.ROOT, RemoteIndexPath.FILE_NAME_FORMAT, fileName))
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.threadpool.ThreadPool;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;

Expand Down Expand Up @@ -41,9 +42,17 @@ public IndexMetadataUploadListener(ThreadPool threadPool, String threadPoolName)
* @param indexMetadataList list of index metadata of new indexes (or first time index metadata upload).
* @param actionListener listener to be invoked on success or failure.
*/
public final void onNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener) {
executorService.execute(() -> doOnNewIndexUpload(indexMetadataList, actionListener));
public final void onUpload(
List<IndexMetadata> indexMetadataList,
Map<String, IndexMetadata> prevIndexMetadataByName,
ActionListener<Void> actionListener
) {
executorService.execute(() -> doOnUpload(indexMetadataList, prevIndexMetadataByName, actionListener));
}

protected abstract void doOnNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener);
protected abstract void doOnUpload(
List<IndexMetadata> indexMetadataList,
Map<String, IndexMetadata> prevIndexMetadataByName,
ActionListener<Void> actionListener
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri
final List<UploadedIndexMetadata> allUploadedIndexMetadata = writeIndexMetadataParallel(
clusterState,
toUpload,
ClusterState.UNKNOWN_UUID.equals(previousClusterUUID) ? toUpload : Collections.emptyList()
Collections.emptyMap()
);
final ClusterMetadataManifest manifest = uploadManifest(
clusterState,
Expand Down Expand Up @@ -307,9 +307,9 @@ public ClusterMetadataManifest writeIncrementalMetadata(
}

// Write Index Metadata
final Map<String, Long> previousStateIndexMetadataVersionByName = new HashMap<>();
final Map<String, IndexMetadata> previousStateIndexMetadataByName = new HashMap<>();
for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) {
previousStateIndexMetadataVersionByName.put(indexMetadata.getIndex().getName(), indexMetadata.getVersion());
previousStateIndexMetadataByName.put(indexMetadata.getIndex().getName(), indexMetadata);
}

int numIndicesUpdated = 0;
Expand All @@ -319,9 +319,12 @@ public ClusterMetadataManifest writeIncrementalMetadata(
.collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity()));

List<IndexMetadata> toUpload = new ArrayList<>();
List<IndexMetadata> newIndexMetadataList = new ArrayList<>();
// We prepare a map that contains the previous index metadata for the indexes for which version has changed.
Map<String, IndexMetadata> prevIndexMetadataByName = new HashMap<>();
for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
final Long previousVersion = previousStateIndexMetadataVersionByName.get(indexMetadata.getIndex().getName());
String indexName = indexMetadata.getIndex().getName();
final IndexMetadata prevIndexMetadata = previousStateIndexMetadataByName.get(indexName);
Long previousVersion = prevIndexMetadata != null ? prevIndexMetadata.getVersion() : null;
if (previousVersion == null || indexMetadata.getVersion() != previousVersion) {
logger.debug(
"updating metadata for [{}], changing version from [{}] to [{}]",
Expand All @@ -331,22 +334,19 @@ public ClusterMetadataManifest writeIncrementalMetadata(
);
numIndicesUpdated++;
toUpload.add(indexMetadata);
prevIndexMetadataByName.put(indexName, prevIndexMetadata);
} else {
numIndicesUnchanged++;
}
previousStateIndexMetadataVersionByName.remove(indexMetadata.getIndex().getName());
// Adding the indexMetadata to newIndexMetadataList if there is no previous version present for the index.
if (previousVersion == null) {
newIndexMetadataList.add(indexMetadata);
}
previousStateIndexMetadataByName.remove(indexMetadata.getIndex().getName());
}

List<UploadedIndexMetadata> uploadedIndexMetadataList = writeIndexMetadataParallel(clusterState, toUpload, newIndexMetadataList);
List<UploadedIndexMetadata> uploadedIndexMetadataList = writeIndexMetadataParallel(clusterState, toUpload, prevIndexMetadataByName);
uploadedIndexMetadataList.forEach(
uploadedIndexMetadata -> allUploadedIndexMetadata.put(uploadedIndexMetadata.getIndexName(), uploadedIndexMetadata)
);

for (String removedIndexName : previousStateIndexMetadataVersionByName.keySet()) {
for (String removedIndexName : previousStateIndexMetadataByName.keySet()) {
allUploadedIndexMetadata.remove(removedIndexName);
}
final ClusterMetadataManifest manifest = uploadManifest(
Expand Down Expand Up @@ -452,7 +452,7 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException
private List<UploadedIndexMetadata> writeIndexMetadataParallel(
ClusterState clusterState,
List<IndexMetadata> toUpload,
List<IndexMetadata> newIndexMetadataList
Map<String, IndexMetadata> prevIndexMetadataByName
) throws IOException {
assert Objects.nonNull(indexMetadataUploadListeners) : "indexMetadataUploadListeners can not be null";
int latchCount = toUpload.size() + indexMetadataUploadListeners.size();
Expand Down Expand Up @@ -482,7 +482,7 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(
writeIndexMetadataAsync(clusterState, indexMetadata, latchedActionListener);
}

invokeIndexMetadataUploadListeners(newIndexMetadataList, latch, exceptionList);
invokeIndexMetadataUploadListeners(toUpload, prevIndexMetadataByName, latch, exceptionList);

try {
if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
Expand Down Expand Up @@ -527,22 +527,25 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(
* Invokes the index metadata upload listener but does not wait for the execution to complete.
*/
private void invokeIndexMetadataUploadListeners(
List<IndexMetadata> newIndexMetadataList,
List<IndexMetadata> udpatedIndexMetadataList,
Map<String, IndexMetadata> prevIndexMetadataByName,
CountDownLatch latch,
List<Exception> exceptionList
) {
for (IndexMetadataUploadListener listener : indexMetadataUploadListeners) {
String listenerName = listener.getClass().getSimpleName();
listener.onNewIndexUpload(
newIndexMetadataList,
getIndexMetadataUploadActionListener(newIndexMetadataList, latch, exceptionList, listenerName)
listener.onUpload(
udpatedIndexMetadataList,
prevIndexMetadataByName,
getIndexMetadataUploadActionListener(udpatedIndexMetadataList, prevIndexMetadataByName, latch, exceptionList, listenerName)
);
}

}

private ActionListener<Void> getIndexMetadataUploadActionListener(
List<IndexMetadata> newIndexMetadataList,
Map<String, IndexMetadata> prevIndexMetadataByName,
CountDownLatch latch,
List<Exception> exceptionList,
String listenerName
Expand All @@ -552,18 +555,20 @@ private ActionListener<Void> getIndexMetadataUploadActionListener(
ActionListener.wrap(
ignored -> logger.trace(
new ParameterizedMessage(
"{} : Invoked listener={} successfully tookTimeNs={}",
"listener={} : Invoked successfully with indexMetadataList={} prevIndexMetadataList={} tookTimeNs={}",
listenerName,
newIndexMetadataList,
prevIndexMetadataByName.values(),
(System.nanoTime() - startTime)
)
),
ex -> {
logger.error(
new ParameterizedMessage(
"{} : Exception during invocation of listener={} tookTimeNs={}",
"listener={} : Exception during invocation with indexMetadataList={} prevIndexMetadataList={} tookTimeNs={}",
listenerName,
newIndexMetadataList,
prevIndexMetadataByName.values(),
(System.nanoTime() - startTime)
),
ex
Expand Down
19 changes: 2 additions & 17 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.index.Index;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.ingest.IngestService;
Expand All @@ -62,8 +61,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -990,7 +987,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
*/
widenIndexSortType = IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(settings).before(V_2_7_0);
assignedOnRemoteNode = RemoteStoreNodeAttribute.isRemoteDataAttributePresent(this.getNodeSettings());
remoteStorePathStrategy = determineRemoteStorePathStrategy();
remoteStorePathStrategy = RemoteStoreUtils.determineRemoteStorePathStrategy(indexMetadata, true);

setEnableFuzzySetForDocId(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING));
setDocIdFuzzySetFalsePositiveProbability(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING));
Expand Down Expand Up @@ -1911,18 +1908,6 @@ public void setDocIdFuzzySetFalsePositiveProbability(double docIdFuzzySetFalsePo
this.docIdFuzzySetFalsePositiveProbability = docIdFuzzySetFalsePositiveProbability;
}

private RemoteStorePathStrategy determineRemoteStorePathStrategy() {
Map<String, String> remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
assert remoteCustomData == null || remoteCustomData.containsKey(PathType.NAME);
if (remoteCustomData != null && remoteCustomData.containsKey(PathType.NAME)) {
PathType pathType = PathType.parseString(remoteCustomData.get(PathType.NAME));
String hashAlgoStr = remoteCustomData.get(PathHashAlgorithm.NAME);
PathHashAlgorithm hashAlgorithm = Objects.nonNull(hashAlgoStr) ? PathHashAlgorithm.parseString(hashAlgoStr) : null;
return new RemoteStorePathStrategy(pathType, hashAlgorithm);
}
return new RemoteStorePathStrategy(PathType.FIXED);
}

public RemoteStorePathStrategy getRemoteStorePathStrategy() {
return remoteStorePathStrategy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,16 @@ public class RemoteIndexPath implements ToXContentFragment {
combinedPath.putAll(SEGMENT_PATH);
COMBINED_PATH = Collections.unmodifiableMap(combinedPath);
}
private static final String DEFAULT_VERSION = "1";
public static final String DEFAULT_VERSION = "1";
public static final String DIR = "remote-index-path";
public static final String FILE_NAME_FORMAT = "remote_path_%s";
static final String KEY_VERSION = "version";
static final String KEY_INDEX_UUID = "index_uuid";
static final String KEY_SHARD_COUNT = "shard_count";
static final String KEY_PATH_CREATION_MAP = "path_creation_map";
static final String KEY_PATHS = "paths";

private final String version;
private final String indexUUID;
private final int shardCount;
private final Iterable<String> basePath;
Expand Down Expand Up @@ -109,6 +111,7 @@ public RemoteIndexPath(
.getFormattedMessage()
);
}
this.version = DEFAULT_VERSION;
this.indexUUID = indexUUID;
this.shardCount = shardCount;
this.basePath = basePath;
Expand All @@ -119,7 +122,7 @@ public RemoteIndexPath(

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(KEY_VERSION, DEFAULT_VERSION);
builder.field(KEY_VERSION, version);
builder.field(KEY_INDEX_UUID, indexUUID);
builder.field(KEY_SHARD_COUNT, shardCount);
builder.field(PathType.NAME, pathType.name());
Expand Down Expand Up @@ -156,4 +159,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public static RemoteIndexPath fromXContent(XContentParser ignored) {
throw new UnsupportedOperationException("RemoteIndexPath.fromXContent() is not supported");
}

String getVersion() {
return version;
}
}
Loading

0 comments on commit 1fb08f0

Please sign in to comment.