-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Remote Store] Add header(codec) and footer(checksum) to remote store…
… segment metadata (#5917) Signed-off-by: Varun Bansal <bansvaru@amazon.com>
- Loading branch information
Showing
9 changed files
with
717 additions
and
23 deletions.
There are no files selected for viewing
35 changes: 35 additions & 0 deletions
35
server/src/main/java/org/opensearch/common/io/IndexIOStreamHandler.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.common.io; | ||
|
||
import org.apache.lucene.store.IndexInput; | ||
import org.apache.lucene.store.IndexOutput; | ||
|
||
import java.io.IOException; | ||
|
||
/** | ||
* Interface for reading/writing content streams to/from - {@link T} | ||
* @param <T> The type of content to be read/written to stream | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public interface IndexIOStreamHandler<T> { | ||
/** | ||
* Implements logic to read content from file input stream {@code indexInput} and parse into {@link T} | ||
* @param indexInput file input stream | ||
* @return content parsed to {@link T} | ||
*/ | ||
T readContent(IndexInput indexInput) throws IOException; | ||
|
||
/** | ||
* Implements logic to write content from {@code content} to file output stream {@code indexOutput} | ||
* @param indexOutput file input stream | ||
*/ | ||
void writeContent(IndexOutput indexOutput, T content) throws IOException; | ||
} |
112 changes: 112 additions & 0 deletions
112
server/src/main/java/org/opensearch/common/io/VersionedCodecStreamWrapper.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.common.io; | ||
|
||
import java.io.IOException; | ||
|
||
import org.apache.lucene.codecs.CodecUtil; | ||
import org.apache.lucene.store.BufferedChecksumIndexInput; | ||
import org.apache.lucene.store.ChecksumIndexInput; | ||
import org.apache.lucene.store.IndexInput; | ||
import org.apache.lucene.store.IndexOutput; | ||
|
||
/** | ||
* Manages versioning and checksum for a stream of content. | ||
* @param <T> Type of content to be read/written | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class VersionedCodecStreamWrapper<T> { | ||
// TODO This can be updated to hold a streamReadWriteHandlerFactory and get relevant handler based on the stream versions | ||
private final IndexIOStreamHandler<T> indexIOStreamHandler; | ||
private final int currentVersion; | ||
private final String codec; | ||
|
||
/** | ||
* @param indexIOStreamHandler handler to read/write stream from T | ||
* @param currentVersion latest supported version of the stream | ||
* @param codec: stream codec | ||
*/ | ||
public VersionedCodecStreamWrapper(IndexIOStreamHandler<T> indexIOStreamHandler, int currentVersion, String codec) { | ||
this.indexIOStreamHandler = indexIOStreamHandler; | ||
this.currentVersion = currentVersion; | ||
this.codec = codec; | ||
} | ||
|
||
/** | ||
* Reads stream content from {@code indexInput} and parses the read content to {@link T}. | ||
* Before reading actual content, verifies the header with relevant codec and version. | ||
* After reading the actual content, verifies the checksum as well | ||
* @param indexInput file input stream | ||
* @return stream content parsed into {@link T} | ||
*/ | ||
public T readStream(IndexInput indexInput) throws IOException { | ||
ChecksumIndexInput checksumIndexInput = new BufferedChecksumIndexInput(indexInput); | ||
int readStreamVersion = checkHeader(checksumIndexInput); | ||
T content = getHandlerForVersion(readStreamVersion).readContent(checksumIndexInput); | ||
checkFooter(checksumIndexInput); | ||
return content; | ||
} | ||
|
||
/** | ||
* Writes to file output stream {@code indexOutput} | ||
* @param indexOutput file output stream which will store stream content | ||
* @param content stream content. | ||
*/ | ||
public void writeStream(IndexOutput indexOutput, T content) throws IOException { | ||
this.writeHeader(indexOutput); | ||
getHandlerForVersion(this.currentVersion).writeContent(indexOutput, content); | ||
this.writeFooter(indexOutput); | ||
} | ||
|
||
/** | ||
* Reads header from file input stream containing {@code this.codec} and {@code this.currentVersion}. | ||
* @param indexInput file input stream | ||
* @return header version found in the input stream | ||
*/ | ||
private int checkHeader(IndexInput indexInput) throws IOException { | ||
// TODO Once versioning strategy is decided we'll add support for min/max supported versions | ||
return CodecUtil.checkHeader(indexInput, this.codec, this.currentVersion, this.currentVersion); | ||
} | ||
|
||
/** | ||
* Reads footer from file input stream containing checksum. | ||
* The {@link IndexInput#getFilePointer()} should be at the footer start position. | ||
* @param indexInput file input stream | ||
*/ | ||
private void checkFooter(ChecksumIndexInput indexInput) throws IOException { | ||
CodecUtil.checkFooter(indexInput); | ||
} | ||
|
||
/** | ||
* Writes header with {@code this.codec} and {@code this.currentVersion} to the file output stream | ||
* @param indexOutput file output stream | ||
*/ | ||
private void writeHeader(IndexOutput indexOutput) throws IOException { | ||
CodecUtil.writeHeader(indexOutput, this.codec, this.currentVersion); | ||
} | ||
|
||
/** | ||
* Writes footer with checksum of contents of file output stream | ||
* @param indexOutput file output stream | ||
*/ | ||
private void writeFooter(IndexOutput indexOutput) throws IOException { | ||
CodecUtil.writeFooter(indexOutput); | ||
} | ||
|
||
/** | ||
* Returns relevant handler for the version | ||
* @param version stream content version | ||
*/ | ||
private IndexIOStreamHandler<T> getHandlerForVersion(int version) { | ||
// TODO implement factory and pick relevant handler based on version. | ||
// It should also take into account min and max supported versions | ||
return this.indexIOStreamHandler; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
73 changes: 73 additions & 0 deletions
73
server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.index.store.remote.metadata; | ||
|
||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
import org.opensearch.index.store.RemoteSegmentStoreDirectory; | ||
|
||
/** | ||
* Metadata object for Remote Segment | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class RemoteSegmentMetadata { | ||
/** | ||
* Latest supported version of metadata | ||
*/ | ||
public static final int CURRENT_VERSION = 1; | ||
/** | ||
* Metadata codec | ||
*/ | ||
public static final String METADATA_CODEC = "segment_md"; | ||
|
||
/** | ||
* Data structure holding metadata content | ||
*/ | ||
private final Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> metadata; | ||
|
||
public RemoteSegmentMetadata(Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> metadata) { | ||
this.metadata = metadata; | ||
} | ||
|
||
/** | ||
* Exposes underlying metadata content data structure. | ||
* @return {@code metadata} | ||
*/ | ||
public Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> getMetadata() { | ||
return this.metadata; | ||
} | ||
|
||
/** | ||
* Generate {@code Map<String, String>} from {@link RemoteSegmentMetadata} | ||
* @return {@code Map<String, String>} | ||
*/ | ||
public Map<String, String> toMapOfStrings() { | ||
return this.metadata.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().toString())); | ||
} | ||
|
||
/** | ||
* Generate {@link RemoteSegmentMetadata} from {@code segmentMetadata} | ||
* @param segmentMetadata metadata content in the form of {@code Map<String, String>} | ||
* @return {@link RemoteSegmentMetadata} | ||
*/ | ||
public static RemoteSegmentMetadata fromMapOfStrings(Map<String, String> segmentMetadata) { | ||
return new RemoteSegmentMetadata( | ||
segmentMetadata.entrySet() | ||
.stream() | ||
.collect( | ||
Collectors.toMap( | ||
Map.Entry::getKey, | ||
entry -> RemoteSegmentStoreDirectory.UploadedSegmentMetadata.fromString(entry.getValue()) | ||
) | ||
) | ||
); | ||
} | ||
} |
42 changes: 42 additions & 0 deletions
42
...rc/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandler.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.index.store.remote.metadata; | ||
|
||
import java.io.IOException; | ||
|
||
import org.apache.lucene.store.IndexInput; | ||
import org.apache.lucene.store.IndexOutput; | ||
import org.opensearch.common.io.IndexIOStreamHandler; | ||
|
||
/** | ||
* Handler for {@link RemoteSegmentMetadata} | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class RemoteSegmentMetadataHandler implements IndexIOStreamHandler<RemoteSegmentMetadata> { | ||
/** | ||
* Reads metadata content from metadata file input stream and parsed into {@link RemoteSegmentMetadata} | ||
* @param indexInput metadata file input stream with {@link IndexInput#getFilePointer()} pointing to metadata content | ||
* @return {@link RemoteSegmentMetadata} | ||
*/ | ||
@Override | ||
public RemoteSegmentMetadata readContent(IndexInput indexInput) throws IOException { | ||
return RemoteSegmentMetadata.fromMapOfStrings(indexInput.readMapOfStrings()); | ||
} | ||
|
||
/** | ||
* Writes metadata to file output stream | ||
* @param indexOutput metadata file input stream | ||
* @param content {@link RemoteSegmentMetadata} from which metadata content would be generated | ||
*/ | ||
@Override | ||
public void writeContent(IndexOutput indexOutput, RemoteSegmentMetadata content) throws IOException { | ||
indexOutput.writeMapOfStrings(content.toMapOfStrings()); | ||
} | ||
} |
12 changes: 12 additions & 0 deletions
12
server/src/main/java/org/opensearch/index/store/remote/metadata/package-info.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
/** | ||
* Package containing metadata constructs for Remote Segment store | ||
*/ | ||
package org.opensearch.index.store.remote.metadata; |
Oops, something went wrong.