Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance searchable snapshots to enable a read-only view of older snapshots #5429

Merged
merged 10 commits into from
Jan 5, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518)), ([#5597](https://github.com/opensearch-project/OpenSearch/pull/5597)), ([#5615](https://github.com/opensearch-project/OpenSearch/pull/5615)))
- Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348))
- Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668))
- Experimental support for extended backward compatiblity in searchable snapshots ([#5429](https://github.com/opensearch-project/OpenSearch/pull/5429))

### Dependencies
- Bump bcpg-fips from 1.0.5.1 to 1.0.7.1 ([#5148](https://github.com/opensearch-project/OpenSearch/pull/5148))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,7 @@ public Builder addBlocks(IndexMetadata indexMetadata) {
if (IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING.get(indexMetadata.getSettings())) {
addIndexBlock(indexName, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK);
}
if (IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()
.equals(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
addIndexBlock(indexName, IndexMetadata.REMOTE_READ_ONLY_ALLOW_DELETE);
}
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1825,10 +1825,11 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti
throw new IllegalArgumentException("Unexpected token " + token);
}
}
if (Assertions.ENABLED) {
// Reference:
// https://github.com/opensearch-project/OpenSearch/blob/4dde0f2a3b445b2fc61dab29c5a2178967f4a3e3/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java#L1620-L1628
Version legacyVersion = LegacyESVersion.fromId(6050099);
if (Assertions.ENABLED && Version.indexCreated(builder.settings).onOrAfter(legacyVersion)) {
assert mappingVersion : "mapping version should be present for indices";
}
if (Assertions.ENABLED) {
assert settingsVersion : "settings version should be present for indices";
}
if (Assertions.ENABLED && Version.indexCreated(builder.settings).onOrAfter(LegacyESVersion.V_7_2_0)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation all
*/
public static RoutingPool getIndexPool(IndexMetadata indexMetadata) {
Settings indexSettings = indexMetadata.getSettings();
if (IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey().equals(indexSettings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexSettings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
return REMOTE_CAPABLE;
}
return LOCAL_ONLY;
Expand Down
20 changes: 20 additions & 0 deletions server/src/main/java/org/opensearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.VectorValues;
Expand Down Expand Up @@ -162,6 +163,25 @@ public static SegmentInfos readSegmentInfos(Directory directory) throws IOExcept
return SegmentInfos.readLatestCommit(directory);
}

/**
* A variant of {@link #readSegmentInfos(Directory)} that supports reading indices written by
* older major versions of Lucene. The underlying implementation is a workaround since the
* "expert" readLatestCommit API is currently package-private in Lucene. First, all commits in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a couple questions that might be worth answering in this comment:

  • Why is org.apache.lucene.index.SegmentInfos.readLatestCommit(Directory, int) package private?
  • How does this implementation differ from the built-in (package-private) Lucene version?
  • Is there a plan to swap this implementation out for the built-in Lucene version at some point?

Copy link
Member Author

@kartg kartg Dec 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. In a word - 🤷 The Lucene commit that introduced it kept it package-private, with the only public access to it through DirectoryReader.open which is the workaround used in this method implementation.

  1. The expert API in Lucene fetches only the latest segments_N file and reads the SegmentInfos object from it. The implementation here does the following:
  • Under DirectoryReader.listCommits, It reads the SegmentInfos object for the latest segments_N file as well as for all older segments_N files. These are returned as a sorted list of IndexCommit objects which are created from the corresponding SegmentInfosbut do not hold a reference to them.
  • The latest IndexCommit object is then used to open a DirectoryReader. Underneath, the IndexCommit is transformed to a SegmentInfos object by the same method as the expert API. This SegmentInfos object is the one that is finally returned from this method.

  1. I don't have any information on when Lucene plans on changing the access modifier, so I don't have a plan 😞 though I certainly want to swap this out for the Lucene version.

Copy link
Member Author

@kartg kartg Dec 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should also note that the two code paths described in point 2 above have an associated test case in LuceneTests to check that they are equivalent -testReadSegmentInfosExtendedCompatibilityBaseCase

* the given {@link Directory} are listed - this result includes older Lucene commits. Then,
* the latest index commit is opened via {@link DirectoryReader} by including a minimum supported
* Lucene major version based on the minimum compatibility of the given {@link org.opensearch.Version}.
*/
public static SegmentInfos readSegmentInfosExtendedCompatibility(Directory directory, org.opensearch.Version minimumVersion)
throws IOException {
// This list is sorted from oldest to latest
List<IndexCommit> indexCommits = DirectoryReader.listCommits(directory);
IndexCommit latestCommit = indexCommits.get(indexCommits.size() - 1);
final int minSupportedLuceneMajor = minimumVersion.minimumIndexCompatibilityVersion().luceneVersion.major;
try (StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(latestCommit, minSupportedLuceneMajor, null)) {
return reader.getSegmentInfos();
}
}

/**
* Returns an iterable that allows to iterate over all files in this segments info
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ public class FeatureFlags {
*/
public static final String SEARCHABLE_SNAPSHOT = "opensearch.experimental.feature.searchable_snapshot.enabled";

/**
* Gates the ability for Searchable Snapshots to read snapshots that are older than the
* guaranteed backward compatibility for OpenSearch (one prior major version) on a best effort basis.
*/
public static final String SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a bad idea, but can we consider not adding another feature flag for this? If we need to promote searchable snapshots out of experimental without the extended compatibility support, we can put a new feature flag in at that time.

Copy link
Member Author

@kartg kartg Dec 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to remove it if you feel strongly, but I'd prefer to keep it - Lucene compatibility guarantees that we can read an N-1 snapshot, but support for older versions is on a best-effort basis. Taking this more conservative route ensures that we don't inadvertently expose this and give users the impressions that extended backward compatibility is stable.

Moreover, given that the underlying access relies on a workaround until Lucene makes the "expert" API public, I wouldn't be comfortable removing the feature flag until we can remove the workaround

"opensearch.experimental.feature.searchable_snapshot.extended_compatibility.enabled";

/**
* Gates the functionality of extensions.
* Once the feature is ready for production release, this feature flag can be removed.
Expand Down
14 changes: 11 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -459,11 +459,19 @@ public boolean match(String setting) {
}

/**
* Convenience method to check whether the given IndexSettings contains
* an {@link #INDEX_STORE_TYPE_SETTING} set to the value of this type.
* Convenience method to check whether the given {@link IndexSettings}
* object contains an {@link #INDEX_STORE_TYPE_SETTING} set to the value of this type.
*/
public boolean match(IndexSettings settings) {
return match(INDEX_STORE_TYPE_SETTING.get(settings.getSettings()));
return match(settings.getSettings());
}

/**
* Convenience method to check whether the given {@link Settings}
* object contains an {@link #INDEX_STORE_TYPE_SETTING} set to the value of this type.
*/
public boolean match(Settings settings) {
return match(INDEX_STORE_TYPE_SETTING.get(settings));
}
}

Expand Down
28 changes: 28 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.ingest.IngestService;
Expand All @@ -60,11 +61,13 @@
import java.util.function.Function;
import java.util.function.UnaryOperator;

import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING;
import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectory.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION;

/**
* This class encapsulates all index level settings and handles settings updates.
Expand Down Expand Up @@ -585,6 +588,8 @@ public final class IndexSettings {
private final boolean isRemoteStoreEnabled;
private final String remoteStoreRepository;
private final boolean isRemoteTranslogStoreEnabled;
private final boolean isRemoteSnapshot;
private Version extendedCompatibilitySnapshotVersion;
// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
private volatile Settings settings;
private volatile IndexMetadata indexMetadata;
Expand Down Expand Up @@ -747,6 +752,13 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY);
isRemoteTranslogStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false);
isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings);

if (isRemoteSnapshot && FeatureFlags.isEnabled(SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) {
extendedCompatibilitySnapshotVersion = SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION;
} else {
extendedCompatibilitySnapshotVersion = Version.CURRENT.minimumIndexCompatibilityVersion();
}
this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings);
Expand Down Expand Up @@ -1012,6 +1024,22 @@ public boolean isRemoteTranslogStoreEnabled() {
return isRemoteTranslogStoreEnabled;
}

/**
* Returns true if this is remote/searchable snapshot
*/
public boolean isRemoteSnapshot() {
return isRemoteSnapshot;
}

/**
* If this is a remote snapshot and the extended compatibility
* feature flag is enabled, this returns the minimum {@link Version}
* supported. In all other cases, the return value is null.
*/
public Version getExtendedCompatibilitySnapshotVersion() {
return extendedCompatibilitySnapshotVersion;
}

/**
* Returns the node settings. The settings returned from {@link #getSettings()} are a merged version of the
* index settings and the node settings where node settings are overwritten by index settings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class ReadOnlyEngine extends Engine {
private final CompletionStatsCache completionStatsCache;
private final boolean requireCompleteHistory;
private final TranslogManager translogManager;
private final Version minimumSupportedVersion;
kartg marked this conversation as resolved.
Show resolved Hide resolved

protected volatile TranslogStats translogStats;

Expand All @@ -119,6 +120,8 @@ public ReadOnlyEngine(
) {
super(config);
this.requireCompleteHistory = requireCompleteHistory;
// fetch the minimum Version for extended backward compatibility use-cases
this.minimumSupportedVersion = config.getIndexSettings().getExtendedCompatibilitySnapshotVersion();
try {
Store store = config.getStore();
store.incRef();
Expand All @@ -130,7 +133,11 @@ public ReadOnlyEngine(
// we obtain the IW lock even though we never modify the index.
// yet this makes sure nobody else does. including some testing tools that try to be messy
indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null;
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
if (isExtendedCompatibility()) {
this.lastCommittedSegmentInfos = Lucene.readSegmentInfosExtendedCompatibility(directory, this.minimumSupportedVersion);
} else {
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
}
if (seqNoStats == null) {
seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos);
ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats);
Expand Down Expand Up @@ -223,7 +230,17 @@ protected final OpenSearchDirectoryReader wrapReader(

protected DirectoryReader open(IndexCommit commit) throws IOException {
assert Transports.assertNotTransportThread("opening index commit of a read-only engine");
return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(commit), Lucene.SOFT_DELETES_FIELD);
DirectoryReader reader;
if (isExtendedCompatibility()) {
reader = DirectoryReader.open(commit, this.minimumSupportedVersion.luceneVersion.major, null);
} else {
reader = DirectoryReader.open(commit);
}
return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
}

private boolean isExtendedCompatibility() {
return Version.CURRENT.minimumIndexCompatibilityVersion().onOrAfter(this.minimumSupportedVersion);
}

@Override
Expand Down
14 changes: 12 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1984,7 +1984,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
};

// Do not load the global checkpoint if this is a remote snapshot index
if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexSettings) == false) {
if (indexSettings.isRemoteSnapshot() == false) {
loadGlobalCheckpointToReplicationTracker();
}

Expand Down Expand Up @@ -2039,7 +2039,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
}

private boolean assertSequenceNumbersInCommit() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData();
final Map<String, String> userData = fetchUserData();
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
assert userData.containsKey(MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number";
assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid";
Expand All @@ -2054,6 +2054,16 @@ private boolean assertSequenceNumbersInCommit() throws IOException {
return true;
}

private Map<String, String> fetchUserData() throws IOException {
if (indexSettings.isRemoteSnapshot() && indexSettings.getExtendedCompatibilitySnapshotVersion() != null) {
// Inefficient method to support reading old Lucene indexes
return Lucene.readSegmentInfosExtendedCompatibility(store.directory(), indexSettings.getExtendedCompatibilitySnapshotVersion())
.getUserData();
} else {
return SegmentInfos.readLatestCommit(store.directory()).getUserData();
}
}

private void onNewEngine(Engine newEngine) {
assert Thread.holdsLock(engineMutex);
refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation);
Expand Down
30 changes: 28 additions & 2 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,21 @@ public Directory directory() {
public SegmentInfos readLastCommittedSegmentsInfo() throws IOException {
kartg marked this conversation as resolved.
Show resolved Hide resolved
failIfCorrupted();
try {
return readSegmentsInfo(null, directory());
if (indexSettings.isRemoteSnapshot() && indexSettings.getExtendedCompatibilitySnapshotVersion() != null) {
return readSegmentInfosExtendedCompatibility(directory(), indexSettings.getExtendedCompatibilitySnapshotVersion());
} else {
return readSegmentsInfo(null, directory());
}
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
markStoreCorrupted(ex);
throw ex;
}
}

/**
* Returns the segments info for the given commit or for the latest commit if the given commit is <code>null</code>
* Returns the segments info for the given commit or for the latest commit if the given commit is <code>null</code>.
* This method will throw an exception if the index is older than the standard backwards compatibility
* policy ( current major - 1). See also {@link #readSegmentInfosExtendedCompatibility(Directory, org.opensearch.Version)}.
*
* @throws IOException if the index is corrupted or the segments file is not present
*/
Expand All @@ -245,7 +251,27 @@ private static SegmentInfos readSegmentsInfo(IndexCommit commit, Directory direc
} catch (Exception ex) {
throw new CorruptIndexException("Hit unexpected exception while reading segment infos", "commit(" + commit + ")", ex);
}
}

/**
* Returns the segments info for the latest commit in the given directory. Unlike
* {@link #readSegmentsInfo(IndexCommit, Directory)}, this method supports reading
* older Lucene indices on a best-effort basis.
*
* @throws IOException if the index is corrupted or the segments file is not present
*/
private static SegmentInfos readSegmentInfosExtendedCompatibility(Directory directory, org.opensearch.Version minimumVersion)
throws IOException {
try {
return Lucene.readSegmentInfosExtendedCompatibility(directory, minimumVersion);
} catch (EOFException eof) {
// TODO this should be caught by lucene - EOF is almost certainly an index corruption
throw new CorruptIndexException("Read past EOF while reading segment infos", "<latest-commit>", eof);
} catch (IOException exception) {
throw exception; // IOExceptions like too many open files are not necessarily a corruption - just bubble it up
} catch (Exception ex) {
throw new CorruptIndexException("Hit unexpected exception while reading segment infos", "<latest-commit>", ex);
}
}

final void ensureOpen() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.NoLockFactory;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput;
import org.opensearch.index.store.remote.file.OnDemandVirtualFileSnapshotIndexInput;
Expand All @@ -35,6 +37,9 @@
* @opensearch.internal
*/
public final class RemoteSnapshotDirectory extends Directory {

public static final Version SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION = LegacyESVersion.fromId(6000099);
kartg marked this conversation as resolved.
Show resolved Hide resolved

private static final String VIRTUAL_FILE_PREFIX = BlobStoreRepository.VIRTUAL_DATA_BLOB_PREFIX;

private final Map<String, BlobStoreIndexShardSnapshot.FileInfo> fileInfoMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
if (idxSettings.isSegRepEnabled()) {
return new NRTReplicationEngineFactory();
}
if (IndexModule.Type.REMOTE_SNAPSHOT.match(idxSettings)) {
if (idxSettings.isRemoteSnapshot()) {
return config -> new ReadOnlyEngine(config, new SeqNoStats(0, 0, 0), new TranslogStats(), true, Function.identity(), false);
}
return new InternalEngineFactory();
Expand Down
Loading