Skip to content

Commit

Permalink
Use primary term corresponding to generation while deleting
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
Sachin Kale committed Sep 4, 2024
1 parent e087272 commit efe0860
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog {
private final Map<Long, String> metadataFilePinnedTimestampMap;
// For metadata files, with no min generation in the name, we cache generation data to avoid multiple reads.
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFileGenerationMap;
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap;

public RemoteFsTimestampAwareTranslog(
TranslogConfig config,
Expand Down Expand Up @@ -86,6 +87,7 @@ public RemoteFsTimestampAwareTranslog(
logger = Loggers.getLogger(getClass(), shardId);
this.metadataFilePinnedTimestampMap = new HashMap<>();
this.oldFormatMetadataFileGenerationMap = new HashMap<>();
this.oldFormatMetadataFilePrimaryTermMap = new HashMap<>();
}

@Override
Expand Down Expand Up @@ -184,35 +186,41 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);

logger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);
Set<Long> generationsToBeDeleted = getGenerationsToBeDeleted(
Map<Long, Set<Long>> generationsToBeDeletedToPrimaryTermRangeMap = getGenerationsToBeDeleted(
metadataFilesNotToBeDeleted,
metadataFilesToBeDeleted,
indexDeleted
);

logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted);
if (generationsToBeDeleted.isEmpty() == false) {
logger.debug(() -> "generationsToBeDeletedToPrimaryTermRangeMap = " + generationsToBeDeletedToPrimaryTermRangeMap);
if (generationsToBeDeletedToPrimaryTermRangeMap.isEmpty() == false) {
// Delete stale generations
Map<Long, Set<Long>> primaryTermToGenerationsMap = getPrimaryTermToGenerationsMap(
generationsToBeDeletedToPrimaryTermRangeMap
);
translogTransferManager.deleteGenerationAsync(
primaryTermSupplier.getAsLong(),
generationsToBeDeleted,
primaryTermToGenerationsMap,
remoteGenerationDeletionPermits::release
);
} else {
remoteGenerationDeletionPermits.release();
}

if (metadataFilesToBeDeleted.isEmpty() == false) {
// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(
metadataFilesToBeDeleted,
remoteGenerationDeletionPermits::release
);

// Update cache to keep only those metadata files that are not getting deleted
oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted);

// Delete stale primary terms
deleteStaleRemotePrimaryTerms(metadataFiles);
} else {
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
remoteGenerationDeletionPermits.release();
}

// Update cache to keep only those metadata files that are not getting deleted
oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted);

// Delete stale primary terms
deleteStaleRemotePrimaryTerms(metadataFiles);
} catch (Exception e) {
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
}
Expand All @@ -227,8 +235,21 @@ public void onFailure(Exception e) {
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
}

protected Map<Long, Set<Long>> getPrimaryTermToGenerationsMap(Map<Long, Set<Long>> generationsToBeDeletedToPrimaryTermRangeMap) {
Map<Long, Set<Long>> primaryTermToGenerationsMap = new HashMap<>();
for (Map.Entry<Long, Set<Long>> entry : generationsToBeDeletedToPrimaryTermRangeMap.entrySet()) {
for (Long primaryTerm : entry.getValue()) {
if (primaryTermToGenerationsMap.containsKey(primaryTerm) == false) {
primaryTermToGenerationsMap.put(primaryTerm, new HashSet<>());
}
primaryTermToGenerationsMap.get(primaryTerm).add(entry.getKey());
}
}
return primaryTermToGenerationsMap;
}

// Visible for testing
protected Set<Long> getGenerationsToBeDeleted(
protected Map<Long, Set<Long>> getGenerationsToBeDeleted(
List<String> metadataFilesNotToBeDeleted,
List<String> metadataFilesToBeDeleted,
boolean indexDeleted
Expand All @@ -239,24 +260,56 @@ protected Set<Long> getGenerationsToBeDeleted(
maxGenerationToBeDeleted = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep();
}

Set<Long> generationsFromMetadataFilesToBeDeleted = new HashSet<>();
Map<Long, Set<String>> generationsFromMetadataFilesToBeDeleted = new HashMap<>();
for (String mdFile : metadataFilesToBeDeleted) {
Tuple<Long, Long> minMaxGen = getMinMaxTranslogGenerationFromMetadataFile(mdFile, translogTransferManager);
generationsFromMetadataFilesToBeDeleted.addAll(
LongStream.rangeClosed(minMaxGen.v1(), minMaxGen.v2()).boxed().collect(Collectors.toList())
);
List<Long> generations = LongStream.rangeClosed(minMaxGen.v1(), minMaxGen.v2()).boxed().collect(Collectors.toList());
for (Long generation : generations) {
if (generationsFromMetadataFilesToBeDeleted.containsKey(generation) == false) {
generationsFromMetadataFilesToBeDeleted.put(generation, new HashSet<>());
}
generationsFromMetadataFilesToBeDeleted.get(generation).add(mdFile);
}
}

for (String mdFile : metadataFilesNotToBeDeleted) {
Tuple<Long, Long> minMaxGen = getMinMaxTranslogGenerationFromMetadataFile(mdFile, translogTransferManager);
List<Long> generations = LongStream.rangeClosed(minMaxGen.v1(), minMaxGen.v2()).boxed().collect(Collectors.toList());
for (Long generation : generations) {
if (generationsFromMetadataFilesToBeDeleted.containsKey(generation)) {
generationsFromMetadataFilesToBeDeleted.get(generation).add(mdFile);
}
}
}

Map<String, Tuple<Long, Long>> metadataFileNotToBeDeletedGenerationMap = getGenerationForMetadataFiles(metadataFilesNotToBeDeleted);
TreeSet<Tuple<Long, Long>> pinnedGenerations = getOrderedPinnedMetadataGenerations(metadataFileNotToBeDeletedGenerationMap);
Set<Long> generationsToBeDeleted = new HashSet<>();
for (long generation : generationsFromMetadataFilesToBeDeleted) {
Map<Long, Set<Long>> generationsToBeDeletedToPrimaryTermRangeMap = new HashMap<>();
for (long generation : generationsFromMetadataFilesToBeDeleted.keySet()) {
// Check if the generation is not referred by metadata file matching pinned timestamps
if (generation <= maxGenerationToBeDeleted && isGenerationPinned(generation, pinnedGenerations) == false) {
generationsToBeDeleted.add(generation);
generationsToBeDeletedToPrimaryTermRangeMap.put(
generation,
getPrimaryTermRange(generationsFromMetadataFilesToBeDeleted.get(generation), translogTransferManager)
);
}
}
return generationsToBeDeletedToPrimaryTermRangeMap;
}

protected Set<Long> getPrimaryTermRange(Set<String> metadataFiles, TranslogTransferManager translogTransferManager) throws IOException {
Tuple<Long, Long> primaryTermRange = new Tuple<>(Long.MIN_VALUE, Long.MAX_VALUE);
for (String metadataFile : metadataFiles) {
Tuple<Long, Long> primaryTermRangeForMdFile = getMinMaxPrimaryTermFromMetadataFile(metadataFile, translogTransferManager);
primaryTermRange = new Tuple<>(
Math.max(primaryTermRange.v1(), primaryTermRangeForMdFile.v1()),
Math.min(primaryTermRange.v2(), primaryTermRangeForMdFile.v2())
);
if (primaryTermRange.v1().equals(primaryTermRange.v2())) {
break;
}
}
return generationsToBeDeleted;
return LongStream.rangeClosed(primaryTermRange.v1(), primaryTermRange.v2()).boxed().collect(Collectors.toSet());
}

// Visible for testing
Expand Down Expand Up @@ -351,6 +404,36 @@ protected Tuple<Long, Long> getMinMaxTranslogGenerationFromMetadataFile(
}
}

protected Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(String metadataFile, TranslogTransferManager translogTransferManager)
throws IOException {
Tuple<Long, Long> minMaxPrimaryTermFromFileName = TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(metadataFile);
if (minMaxPrimaryTermFromFileName != null) {
return minMaxPrimaryTermFromFileName;
} else {
if (oldFormatMetadataFilePrimaryTermMap.containsKey(metadataFile)) {
return oldFormatMetadataFilePrimaryTermMap.get(metadataFile);
} else {
TranslogTransferMetadata metadata = translogTransferManager.readMetadata(metadataFile);
long maxPrimaryTem = TranslogTransferMetadata.getPrimaryTermFromFileName(metadataFile);
long minPrimaryTem = -1;
if (metadata.getGenerationToPrimaryTermMapper() != null
&& metadata.getGenerationToPrimaryTermMapper().values().isEmpty() == false) {
Optional<Long> primaryTerm = metadata.getGenerationToPrimaryTermMapper()
.values()
.stream()
.map(s -> Long.parseLong(s))
.min(Long::compareTo);
if (primaryTerm.isPresent()) {
minPrimaryTem = primaryTerm.get();
}
}
Tuple<Long, Long> minMaxPrimaryTermTuple = new Tuple<>(minPrimaryTem, maxPrimaryTem);
oldFormatMetadataFilePrimaryTermMap.put(metadataFile, minMaxPrimaryTermTuple);
return minMaxPrimaryTermTuple;
}
}
}

/**
* This method must be called only after there are valid generations to delete in trimUnreferencedReaders as it ensures
* implicitly that minimum primary term in latest translog metadata in remote store is the current primary term.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.common.SetOnce;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
Expand All @@ -39,6 +40,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -496,6 +498,12 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep
* @param onCompletion runnable to run on completion of deletion regardless of success/failure.
*/
public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runnable onCompletion) {
List<String> translogFiles = getTranslogFilesFromGenerations(generations);
// Delete the translog and checkpoint files asynchronously
deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);
}

private List<String> getTranslogFilesFromGenerations(Set<Long> generations) {
List<String> translogFiles = new ArrayList<>();
generations.forEach(generation -> {
// Add .ckp and .tlog file to translog file list which is located in basePath/<primaryTerm>
Expand All @@ -507,8 +515,32 @@ public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runna
translogFiles.add(translogFileName);
}
});
return translogFiles;
}

public void deleteGenerationAsync(Map<Long, Set<Long>> primaryTermToGenerationsMap, Runnable onCompletion) {
GroupedActionListener<Void> groupedActionListener = new GroupedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Collection<Void> unused) {
logger.trace(() -> "Deleted translogs for primaryTermToGenerationsMap=" + primaryTermToGenerationsMap);
onCompletion.run();
}

@Override
public void onFailure(Exception e) {
onCompletion.run();
logger.error(
() -> new ParameterizedMessage(
"Exception occurred while deleting translog for primaryTermToGenerationsMap={}",
primaryTermToGenerationsMap
),
e
);
}
}, primaryTermToGenerationsMap.size());

// Delete the translog and checkpoint files asynchronously
deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);
deleteTranslogFilesAsync(primaryTermToGenerationsMap, groupedActionListener);
}

/**
Expand Down Expand Up @@ -683,6 +715,21 @@ public void onFailure(Exception e) {
}
}

private void deleteTranslogFilesAsync(Map<Long, Set<Long>> primaryTermToGenerationsMap, ActionListener<Void> actionListener) {
for (Long primaryTerm : primaryTermToGenerationsMap.keySet()) {
try {
transferService.deleteBlobsAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteDataTransferPath.add(String.valueOf(primaryTerm)),
getTranslogFilesFromGenerations(primaryTermToGenerationsMap.get(primaryTerm)),
actionListener
);
} catch (Exception e) {
actionListener.onFailure(e);
}
}
}

/**
* Deletes metadata files asynchronously using the {@code REMOTE_PURGE} threadpool. On success or failure, runs {@code onCompletion}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
* The metadata associated with every transfer {@link TransferSnapshot}. The metadata is uploaded at the end of the
Expand Down Expand Up @@ -108,11 +109,28 @@ public String getFileName() {
RemoteStoreUtils.invertLong(createdAt),
String.valueOf(Objects.hash(nodeId)),
RemoteStoreUtils.invertLong(minTranslogGeneration),
String.valueOf(getMinPrimaryTermReferred()),
String.valueOf(CURRENT_VERSION)
)
);
}

private long getMinPrimaryTermReferred() {
if (generationToPrimaryTermMapper.get() == null || generationToPrimaryTermMapper.get().values().isEmpty()) {
return -1;
}
Optional<Long> minPrimaryTerm = generationToPrimaryTermMapper.get()
.values()
.stream()
.map(s -> Long.parseLong(s))
.min(Long::compareTo);
if (minPrimaryTerm.isPresent()) {
return minPrimaryTerm.get();
} else {
return -1;
}
}

public static Tuple<Tuple<Long, Long>, String> getNodeIdByPrimaryTermAndGeneration(String filename) {
String[] tokens = filename.split(METADATA_SEPARATOR);
if (tokens.length < 6) {
Expand Down Expand Up @@ -143,15 +161,43 @@ public static Tuple<Long, Long> getMinMaxTranslogGenerationFromFilename(String f
assert Version.CURRENT.onOrAfter(Version.V_2_17_0);
try {
// instead of direct index, we go backwards to avoid running into same separator in nodeId
String minGeneration = tokens[tokens.length - 2];
String minGeneration = tokens[tokens.length - 3];
String maxGeneration = tokens[2];
return new Tuple<>(RemoteStoreUtils.invertLong(minGeneration), RemoteStoreUtils.invertLong(maxGeneration));
} catch (NumberFormatException e) {
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Exception while getting min and max translog generation from: {}", filename), e);
return null;
}
}

public static Tuple<Long, Long> getMinMaxPrimaryTermFromFilename(String filename) {
String[] tokens = filename.split(METADATA_SEPARATOR);
if (tokens.length < 7) {
// For versions < 2.17, we don't have min primary term.
return null;
}
assert Version.CURRENT.onOrAfter(Version.V_2_17_0);
try {
// instead of direct index, we go backwards to avoid running into same separator in nodeId
String minPrimaryTerm = tokens[tokens.length - 2];
String maxPrimaryTerm = tokens[1];
return new Tuple<>(Long.parseLong(minPrimaryTerm), RemoteStoreUtils.invertLong(maxPrimaryTerm));
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Exception while getting min and max primary term from: {}", filename), e);
return null;
}
}

public static long getPrimaryTermFromFileName(String filename) {
String[] tokens = filename.split(METADATA_SEPARATOR);
try {
return RemoteStoreUtils.invertLong(tokens[1]);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Exception while getting max primary term from: {}", filename), e);
return -1;
}
}

@Override
public int hashCode() {
return Objects.hash(primaryTerm, generation);
Expand Down
Loading

0 comments on commit efe0860

Please sign in to comment.