Skip to content

Commit

Permalink
Validate checksum of each segment file post download from remote store (
Browse files Browse the repository at this point in the history
#10119)

---------

Signed-off-by: Sachin Kale <kalsac@amazon.com>
Co-authored-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale and Sachin Kale authored Oct 12, 2023
1 parent 9c06228 commit 6c02261
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,39 @@

import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.test.CorruptionUtils;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.greaterThan;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreRestoreIT extends BaseRemoteStoreRestoreIT {

/**
Expand Down Expand Up @@ -461,5 +467,30 @@ public void testRateLimitedRemoteDownloads() throws Exception {
}
}

public void testRestoreCorruptSegmentShouldFail() throws IOException, ExecutionException, InterruptedException {
prepareCluster(1, 3, INDEX_NAME, 0, 1);
indexData(randomIntBetween(3, 4), true, INDEX_NAME);

GetIndexResponse getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest()).get();
String indexUUID = getIndexResponse.getSettings().get(INDEX_NAME).get(IndexMetadata.SETTING_INDEX_UUID);

logger.info("--> Corrupting segment files in remote segment store");
Path path = segmentRepoPath.resolve(indexUUID).resolve("0").resolve("segments").resolve("data");
try (Stream<Path> dataPath = Files.list(path)) {
CorruptionUtils.corruptFile(random(), dataPath.toArray(Path[]::new));
}

logger.info("--> Stop primary");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));

logger.info("--> Close and restore the index");
client().admin()
.cluster()
.restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME).waitForCompletion(true), PlainActionFuture.newFuture());

logger.info("--> Check for index status, should be red due to corruption");
ensureRed(INDEX_NAME);
}

// TODO: Restore flow - index aliases
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ public void setWrittenByMajor(int writtenByMajor) {
);
}
}

public int getWrittenByMajor() {
return writtenByMajor;
}
}

/**
Expand Down
47 changes: 45 additions & 2 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import java.io.UncheckedIOException;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -120,6 +121,7 @@
import java.util.zip.CRC32;
import java.util.zip.Checksum;

import static java.lang.Character.MAX_RADIX;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
Expand Down Expand Up @@ -975,7 +977,11 @@ public void copyFrom(Directory from, String src, String dest, IOContext context)
boolean success = false;
long startTime = System.currentTimeMillis();
try {
super.copyFrom(from, src, dest, context);
if (from instanceof RemoteSegmentStoreDirectory) {
copyFileAndValidateChecksum(from, src, dest, context, fileSize);
} else {
super.copyFrom(from, src, dest, context);
}
success = true;
afterDownload(fileSize, startTime);
} finally {
Expand All @@ -985,6 +991,43 @@ public void copyFrom(Directory from, String src, String dest, IOContext context)
}
}

private void copyFileAndValidateChecksum(Directory from, String src, String dest, IOContext context, long fileSize)
throws IOException {
RemoteSegmentStoreDirectory.UploadedSegmentMetadata metadata = ((RemoteSegmentStoreDirectory) from)
.getSegmentsUploadedToRemoteStore()
.get(dest);
boolean success = false;
try (IndexInput is = from.openInput(src, context); IndexOutput os = createOutput(dest, context)) {
// Here, we don't need the exact version as LuceneVerifyingIndexOutput does not verify version
// It is just used to emit logs when the entire metadata object is provided as parameter. Also,
// we can't provide null version as StoreFileMetadata has non-null check on writtenBy field.
Version luceneMajorVersion = Version.parse(metadata.getWrittenByMajor() + ".0.0");
long checksum = Long.parseLong(metadata.getChecksum());
StoreFileMetadata storeFileMetadata = new StoreFileMetadata(
dest,
fileSize,
Long.toString(checksum, MAX_RADIX),
luceneMajorVersion
);
VerifyingIndexOutput verifyingIndexOutput = new LuceneVerifyingIndexOutput(storeFileMetadata, os);
verifyingIndexOutput.copyBytes(is, is.length());
verifyingIndexOutput.verify();
success = true;
} catch (ParseException e) {
throw new IOException("Exception while reading version info for segment file from remote store: " + dest, e);
} finally {
if (success == false) {
// If the exception is thrown after file is created, we clean up the file.
// We ignore the exception as the deletion is best-effort basis and can fail if file does not exist.
try {
deleteFile("Quietly deleting", dest);
} catch (Exception e) {
// Ignore
}
}
}
}

/**
* Updates the amount of bytes attempted for download
*/
Expand Down Expand Up @@ -1476,7 +1519,7 @@ public static boolean isAutogenerated(String name) {
* Produces a string representation of the given digest value.
*/
public static String digestToString(long digest) {
return Long.toString(digest, Character.MAX_RADIX);
return Long.toString(digest, MAX_RADIX);
}

/**
Expand Down

0 comments on commit 6c02261

Please sign in to comment.