Skip to content

Commit

Permalink
Metadata file read fix to include reading trailing metadata during fu…
Browse files Browse the repository at this point in the history
…ll content read

Signed-off-by: Vikas Bansal <43470111+vikasvb90@users.noreply.github.com>
  • Loading branch information
vikasvb90 committed Sep 4, 2023
1 parent fc76288 commit d3b938b
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private Map<String, BlobMetadata> convertToEncryptedMetadataMap(Map<String, Blob
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> new EncryptedBlobMetadata(entry.getValue(), cryptoHandler, getEncryptedHeaderContentSupplier(entry.getKey()))
entry -> new EncryptedBlobMetadata<>(entry.getValue(), cryptoHandler, getEncryptedHeaderContentSupplier(entry.getKey()))
)
);

Expand All @@ -176,7 +176,7 @@ public void listBlobsByPrefixInSortedOrder(
if (metadataList != null) {
List<BlobMetadata> encryptedMetadata = metadataList.stream()
.map(
blobMetadata -> new EncryptedBlobMetadata(
blobMetadata -> new EncryptedBlobMetadata<>(
blobMetadata,
cryptoHandler,
getEncryptedHeaderContentSupplier(blobMetadata.name())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ public String name() {

@Override
public long length() {
Object cryptoContext;
U cryptoContext;
try {
cryptoContext = cryptoHandler.loadEncryptionMetadata(encryptedHeaderContentSupplier);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
return cryptoHandler.estimateDecryptedLength((U) cryptoContext, delegate.length());
return cryptoHandler.estimateDecryptedLength(cryptoContext, delegate.length());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.crypto.CryptoManagerRegistry;
import org.opensearch.discovery.DiscoveryModule;
import org.opensearch.discovery.HandshakingTransportAddressConnector;
import org.opensearch.discovery.PeerFinder;
Expand Down Expand Up @@ -670,12 +669,7 @@ public void apply(Settings value, Settings current, Settings previous) {

// Remote cluster state settings
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING,

// Crypto settings
CryptoManagerRegistry.CRYPTO_KEY_REFRESH_INTERVAL,
CryptoManagerRegistry.CRYPTO_ALGORITHM,
CryptoManagerRegistry.CRYPTO_KEY_CACHE_SIZE
RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.opensearch.cluster.metadata.CryptoMetadata;
import org.opensearch.common.SetOnce;
import org.opensearch.common.crypto.MasterKeyProvider;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.encryption.CryptoManager;
Expand All @@ -25,7 +24,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;

/**
* During node bootstrap, installed key provider extensions responsible for generating data keys are loaded.
Expand All @@ -44,51 +42,14 @@ public class CryptoManagerRegistry {
private static volatile CryptoManagerRegistry instance;
private static final Object lock = new Object();

/**
* The crypto algorithm to be used by {@link CryptoManager} to encrypt data.
*/
public static final Setting<String> CRYPTO_ALGORITHM = new Setting<>(
"crypto.algorithm",
"ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY",
Function.identity(),
Setting.Property.NodeScope
);

/**
* Refresh interval for the rotation of crypto key used in encrypting data.
*/
public static final Setting<TimeValue> CRYPTO_KEY_REFRESH_INTERVAL = Setting.timeSetting(
"crypto.key.refresh_interval",
TimeValue.timeValueDays(2),
TimeValue.timeValueHours(1),
TimeValue.timeValueDays(10),
Setting.Property.NodeScope
);

/**
* Size of cache used for encryption keys.
*/
public static final Setting<Integer> CRYPTO_KEY_CACHE_SIZE = Setting.intSetting(
"crypto.key.cache_size",
500,
100,
Setting.Property.NodeScope
);

/**
* Initializes the registry with crypto factories for the installed crypto key providers.
*
* @param cryptoPlugins The list of installed crypto key provider plugins.
* @param settings Crypto settings.
*/
protected CryptoManagerRegistry(List<CryptoKeyProviderPlugin> cryptoPlugins, Settings settings) {
cryptoManagerFactory.set(
new CryptoManagerFactory(
CRYPTO_ALGORITHM.get(settings),
CRYPTO_KEY_REFRESH_INTERVAL.get(settings),
CRYPTO_KEY_CACHE_SIZE.get(settings)
)
);
cryptoManagerFactory.set(new CryptoManagerFactory("ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY", TimeValue.timeValueDays(2), 500));
registry.set(loadCryptoFactories(cryptoPlugins));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,17 @@ public void onFailure(Exception e) {
}
}

/**
* Returns stream emitted from by blob object. Should be used with a closeable block.
*
* @param fileName Name of file
* @return Stream from the blob object
* @throws IOException if fetch of stream fails with IO error
*/
public InputStream getBlobStream(String fileName) throws IOException {
return blobContainer.readBlob(fileName);
}

/**
* Removes an existing file in the directory.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -190,9 +191,8 @@ public RemoteSegmentMetadata readLatestMetadataFile() throws IOException {
}

private RemoteSegmentMetadata readMetadataFile(String metadataFilename) throws IOException {
try (IndexInput indexInput = remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)) {
byte[] metadataBytes = new byte[(int) indexInput.length()];
indexInput.readBytes(metadataBytes, 0, (int) indexInput.length());
try (InputStream inputStream = remoteMetadataDirectory.getBlobStream(metadataFilename)) {
byte[] metadataBytes = inputStream.readAllBytes();
return metadataStreamWrapper.readStream(new ByteArrayIndexInput(metadataFilename, metadataBytes));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void testRemoteDirectoryInitThrowsException() throws IOException {
segmentInfos = indexShardStore.readLastCommittedSegmentsInfo();
}

when(remoteMetadataDirectory.openInput(any(), any())).thenAnswer(
when(remoteMetadataDirectory.getBlobStream(any())).thenAnswer(
I -> createMetadataFileBytes(getDummyMetadata("_0", 1), indexShard.getLatestReplicationCheckpoint(), segmentInfos)
);
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(
Expand All @@ -162,9 +162,9 @@ public void testRemoteDirectoryInitThrowsException() throws IOException {
// Since the thrown IOException is caught in the constructor, ctor should be invoked successfully.
new RemoteStoreRefreshListener(shard, SegmentReplicationCheckpointPublisher.EMPTY, mock(RemoteSegmentTransferTracker.class));

// Validate that the openInput method of remoteMetadataDirectory has been opened only once and the
// Validate that the stream of metadata file of remoteMetadataDirectory has been opened only once and the
// listFilesByPrefixInLexicographicOrder has been called twice.
verify(remoteMetadataDirectory, times(1)).openInput(any(), any());
verify(remoteMetadataDirectory, times(1)).getBlobStream(any());
verify(remoteMetadataDirectory, times(2)).listFilesByPrefixInLexicographicOrder(MetadataFilenameUtils.METADATA_PREFIX, 1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.junit.After;
import org.junit.Before;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
Expand Down Expand Up @@ -223,21 +224,21 @@ private Map<String, Map<String, String>> populateMetadata() throws IOException {
getDummyMetadata("_0", 1)
);

when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenAnswer(
when(remoteMetadataDirectory.getBlobStream(metadataFilename)).thenAnswer(
I -> createMetadataFileBytes(
metadataFilenameContentMapping.get(metadataFilename),
indexShard.getLatestReplicationCheckpoint(),
segmentInfos
)
);
when(remoteMetadataDirectory.openInput(metadataFilename2, IOContext.DEFAULT)).thenAnswer(
when(remoteMetadataDirectory.getBlobStream(metadataFilename2)).thenAnswer(
I -> createMetadataFileBytes(
metadataFilenameContentMapping.get(metadataFilename2),
indexShard.getLatestReplicationCheckpoint(),
segmentInfos
)
);
when(remoteMetadataDirectory.openInput(metadataFilename3, IOContext.DEFAULT)).thenAnswer(
when(remoteMetadataDirectory.getBlobStream(metadataFilename3)).thenAnswer(
I -> createMetadataFileBytes(
metadataFilenameContentMapping.get(metadataFilename3),
indexShard.getLatestReplicationCheckpoint(),
Expand Down Expand Up @@ -584,7 +585,7 @@ public void testContainsFile() throws IOException {
metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512::" + Version.LATEST.major);
metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::1024::" + Version.LATEST.major);

when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(
when(remoteMetadataDirectory.getBlobStream(metadataFilename)).thenReturn(
createMetadataFileBytes(metadata, indexShard.getLatestReplicationCheckpoint(), segmentInfos)
);

Expand Down Expand Up @@ -647,7 +648,7 @@ public void testUploadMetadataNonEmpty() throws IOException {
latestMetadataFileName,
getDummyMetadata("_0", (int) generation)
);
when(remoteMetadataDirectory.openInput(latestMetadataFileName, IOContext.DEFAULT)).thenReturn(
when(remoteMetadataDirectory.getBlobStream(latestMetadataFileName)).thenReturn(
createMetadataFileBytes(
metadataFilenameContentMapping.get(latestMetadataFileName),
indexShard.getLatestReplicationCheckpoint(),
Expand Down Expand Up @@ -750,8 +751,8 @@ public void testNoMetadataHeaderCorruptIndexException() throws IOException {
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096);
indexOutput.writeMapOfStrings(metadata);
indexOutput.close();
ByteArrayIndexInput byteArrayIndexInput = new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes()));
when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(byteArrayIndexInput);
ByteArrayInputStream inputStream = new ByteArrayInputStream(BytesReference.toBytes(output.bytes()));
when(remoteMetadataDirectory.getBlobStream(metadataFilename)).thenReturn(inputStream);

assertThrows(CorruptIndexException.class, () -> remoteSegmentStoreDirectory.init());
}
Expand All @@ -775,8 +776,8 @@ public void testInvalidCodecHeaderCorruptIndexException() throws IOException {
indexOutput.writeMapOfStrings(metadata);
CodecUtil.writeFooter(indexOutput);
indexOutput.close();
ByteArrayIndexInput byteArrayIndexInput = new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes()));
when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(byteArrayIndexInput);
ByteArrayInputStream inputStream = new ByteArrayInputStream(BytesReference.toBytes(output.bytes()));
when(remoteMetadataDirectory.getBlobStream(metadataFilename)).thenReturn(inputStream);

assertThrows(CorruptIndexException.class, () -> remoteSegmentStoreDirectory.init());
}
Expand All @@ -800,8 +801,8 @@ public void testHeaderMinVersionCorruptIndexException() throws IOException {
indexOutput.writeMapOfStrings(metadata);
CodecUtil.writeFooter(indexOutput);
indexOutput.close();
ByteArrayIndexInput byteArrayIndexInput = new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes()));
when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(byteArrayIndexInput);
ByteArrayInputStream inputStream = new ByteArrayInputStream(BytesReference.toBytes(output.bytes()));
when(remoteMetadataDirectory.getBlobStream(metadataFilename)).thenReturn(inputStream);

assertThrows(IndexFormatTooOldException.class, () -> remoteSegmentStoreDirectory.init());
}
Expand All @@ -825,8 +826,8 @@ public void testHeaderMaxVersionCorruptIndexException() throws IOException {
indexOutput.writeMapOfStrings(metadata);
CodecUtil.writeFooter(indexOutput);
indexOutput.close();
ByteArrayIndexInput byteArrayIndexInput = new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes()));
when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(byteArrayIndexInput);
ByteArrayInputStream inputStream = new ByteArrayInputStream(BytesReference.toBytes(output.bytes()));
when(remoteMetadataDirectory.getBlobStream(metadataFilename)).thenReturn(inputStream);

assertThrows(IndexFormatTooNewException.class, () -> remoteSegmentStoreDirectory.init());
}
Expand Down Expand Up @@ -854,8 +855,8 @@ public void testIncorrectChecksumCorruptIndexException() throws IOException {
CodecUtil.writeFooter(indexOutputSpy);
indexOutputSpy.close();

ByteArrayIndexInput byteArrayIndexInput = new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes()));
when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(byteArrayIndexInput);
ByteArrayInputStream inputStream = new ByteArrayInputStream(BytesReference.toBytes(output.bytes()));
when(remoteMetadataDirectory.getBlobStream(metadataFilename)).thenReturn(inputStream);

assertThrows(CorruptIndexException.class, () -> remoteSegmentStoreDirectory.init());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
import org.apache.lucene.util.Version;
import org.opensearch.common.UUIDs;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -43,7 +44,7 @@ private RemoteStoreTestUtils() {
* @return ByteArrayIndexInput: metadata file bytes with header and footer
* @throws IOException IOException
*/
public static ByteArrayIndexInput createMetadataFileBytes(
public static InputStream createMetadataFileBytes(
Map<String, String> segmentFilesMap,
ReplicationCheckpoint replicationCheckpoint,
SegmentInfos segmentInfos
Expand All @@ -61,7 +62,7 @@ public static ByteArrayIndexInput createMetadataFileBytes(
indexOutput.writeBytes(byteArray, byteArray.length);
CodecUtil.writeFooter(indexOutput);
indexOutput.close();
return new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes()));
return new ByteArrayInputStream(BytesReference.toBytes(output.bytes()));
}

public static Map<String, String> getDummyMetadata(String prefix, int commitGeneration) {
Expand Down

0 comments on commit d3b938b

Please sign in to comment.