Skip to content

Commit

Permalink
Fetch all the locks for a shard to avoid multiple remote store calls (o…
Browse files Browse the repository at this point in the history
…pensearch-project#11987)

* Fetch all the locks for a shard to avoid multiple calls

Signed-off-by: Sachin Kale <kalsac@amazon.com>

* Fix lock file comparison issue

Signed-off-by: Sachin Kale <kalsac@amazon.com>

* Add unit tests

Signed-off-by: Sachin Kale <kalsac@amazon.com>

* Add more unit tests

Signed-off-by: Sachin Kale <kalsac@amazon.com>

* Address PR comments

Signed-off-by: Sachin Kale <kalsac@amazon.com>

---------

Signed-off-by: Sachin Kale <kalsac@amazon.com>
Co-authored-by: Sachin Kale <kalsac@amazon.com>
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
2 people authored and shiv0408 committed Apr 25, 2024
1 parent e7c7be8 commit a5566fb
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public RemoteSegmentMetadata init() throws IOException {
*/
public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long commitGeneration, String acquirerId) throws IOException {
String metadataFilePrefix = MetadataFilenameUtils.getMetadataFilePrefixForCommit(primaryTerm, commitGeneration);
String metadataFile = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLock(metadataFilePrefix, acquirerId);
String metadataFile = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLockedMetadataFile(metadataFilePrefix, acquirerId);
RemoteSegmentMetadata remoteSegmentMetadata = readMetadataFile(metadataFile);
if (remoteSegmentMetadata != null) {
this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(remoteSegmentMetadata.getMetadata());
Expand Down Expand Up @@ -749,20 +749,16 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
lastNMetadataFilesToKeep,
sortedMetadataFileList.size()
);
List<String> metadataFilesToBeDeleted = metadataFilesEligibleToDelete.stream().filter(metadataFile -> {
try {
return !isLockAcquired(metadataFile);
} catch (IOException e) {
logger.error(
"skipping metadata file ("
+ metadataFile
+ ") deletion for this run,"
+ " as checking lock for metadata is failing with error: "
+ e
);
return false;
}
}).collect(Collectors.toList());
Set<String> allLockFiles;
try {
allLockFiles = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLockedMetadataFiles(MetadataFilenameUtils.METADATA_PREFIX);
} catch (Exception e) {
logger.error("Exception while fetching segment metadata lock files, skipping deleteStaleSegments", e);
return;
}
List<String> metadataFilesToBeDeleted = metadataFilesEligibleToDelete.stream()
.filter(metadataFile -> allLockFiles.contains(metadataFile) == false)
.collect(Collectors.toList());

sortedMetadataFileList.removeAll(metadataFilesToBeDeleted);
logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -75,7 +76,7 @@ public void release(LockInfo lockInfo) throws IOException {
}
}

public String fetchLock(String filenamePrefix, String acquirerId) throws IOException {
public String fetchLockedMetadataFile(String filenamePrefix, String acquirerId) throws IOException {
Collection<String> lockFiles = lockDirectory.listFilesByPrefix(filenamePrefix);
List<String> lockFilesForAcquirer = lockFiles.stream()
.filter(lockFile -> acquirerId.equals(FileLockInfo.LockFileUtils.getAcquirerIdFromLock(lockFile)))
Expand All @@ -88,6 +89,11 @@ public String fetchLock(String filenamePrefix, String acquirerId) throws IOExcep
return lockFilesForAcquirer.get(0);
}

public Set<String> fetchLockedMetadataFiles(String filenamePrefix) throws IOException {
Collection<String> lockFiles = lockDirectory.listFilesByPrefix(filenamePrefix);
return lockFiles.stream().map(FileLockInfo.LockFileUtils::getFileToLockNameFromLock).collect(Collectors.toSet());
}

/**
* Checks whether a given file have any lock on it or not.
* @param lockInfo File Lock Info instance for which we need to check if lock is acquired.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,53 @@ public void testDeleteStaleCommitsActualDelete() throws Exception {
verify(remoteMetadataDirectory).deleteFile(metadataFilename3);
}

public void testDeleteStaleCommitsActualDeleteWithLocks() throws Exception {
Map<String, Map<String, String>> metadataFilenameContentMapping = populateMetadata();
remoteSegmentStoreDirectory.init();

// Locking one of the metadata files to ensure that it is not getting deleted.
when(mdLockManager.fetchLockedMetadataFiles(any())).thenReturn(Set.of(metadataFilename2));

// popluateMetadata() adds stub to return 3 metadata files
// We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted
remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(1);

for (String metadata : metadataFilenameContentMapping.get(metadataFilename3).values()) {
String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1];
verify(remoteDataDirectory).deleteFile(uploadedFilename);
}
assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true)));
verify(remoteMetadataDirectory).deleteFile(metadataFilename3);
verify(remoteMetadataDirectory, times(0)).deleteFile(metadataFilename2);
}

public void testDeleteStaleCommitsNoDeletesDueToLocks() throws Exception {
remoteSegmentStoreDirectory.init();

// Locking all the old metadata files to ensure that none of the segment files are getting deleted.
when(mdLockManager.fetchLockedMetadataFiles(any())).thenReturn(Set.of(metadataFilename2, metadataFilename3));

// popluateMetadata() adds stub to return 3 metadata files
// We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted
remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(1);

assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true)));
verify(remoteMetadataDirectory, times(0)).deleteFile(any());
}

public void testDeleteStaleCommitsExceptionWhileFetchingLocks() throws Exception {
remoteSegmentStoreDirectory.init();

// Locking one of the metadata files to ensure that it is not getting deleted.
when(mdLockManager.fetchLockedMetadataFiles(any())).thenThrow(new RuntimeException("Rate limit exceeded"));

// popluateMetadata() adds stub to return 3 metadata files
// We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted
remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(1);

verify(remoteMetadataDirectory, times(0)).deleteFile(any());
}

public void testDeleteStaleCommitsDeleteDedup() throws Exception {
Map<String, Map<String, String>> metadataFilenameContentMapping = new HashMap<>(populateMetadata());
metadataFilenameContentMapping.put(metadataFilename4, metadataFilenameContentMapping.get(metadataFilename3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;

import junit.framework.TestCase;

Expand Down Expand Up @@ -96,4 +97,26 @@ public void testIsAcquiredExceptionCase() { // metadata file is not passed durin
FileLockInfo testLockInfo = FileLockInfo.getLockInfoBuilder().withAcquirerId(testAcquirerId).build();
assertThrows(IllegalArgumentException.class, () -> remoteStoreMetadataLockManager.isAcquired(testLockInfo));
}

public void testFetchLocksEmpty() throws IOException {
when(lockDirectory.listFilesByPrefix("metadata")).thenReturn(Set.of());
assertEquals(0, remoteStoreMetadataLockManager.fetchLockedMetadataFiles("metadata").size());
}

public void testFetchLocksNonEmpty() throws IOException {
String metadata1 = "metadata_1_2_3";
String metadata2 = "metadata_4_5_6";
when(lockDirectory.listFilesByPrefix("metadata")).thenReturn(
Set.of(
FileLockInfo.LockFileUtils.generateLockName(metadata1, "snapshot1"),
FileLockInfo.LockFileUtils.generateLockName(metadata2, "snapshot2")
)
);
assertEquals(Set.of(metadata1, metadata2), remoteStoreMetadataLockManager.fetchLockedMetadataFiles("metadata"));
}

public void testFetchLocksException() throws IOException {
when(lockDirectory.listFilesByPrefix("metadata")).thenThrow(new IOException("Something went wrong"));
assertThrows(IOException.class, () -> remoteStoreMetadataLockManager.fetchLockedMetadataFiles("metadata"));
}
}

0 comments on commit a5566fb

Please sign in to comment.