Skip to content

Commit

Permalink
More changes while reading md file and UTs
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
  • Loading branch information
gbbafna committed Jul 3, 2023
1 parent 3178356 commit 3931db0
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static void initializeFeatureFlags(Settings openSearchSettings) {
* and false otherwise.
*/
public static boolean isEnabled(String featureFlagName) {
if ("true".equalsIgnoreCase(System.getProperty(featureFlagName))) {
if ("true".equalsIgnoreCase(System.getProperty(featureFlagName)) || featureFlagName.equals(REMOTE_STORE)) {
// TODO: Remove the if condition once FeatureFlags are only supported via opensearch.yml
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ public void trimUnreferencedReaders() throws IOException {
}
if (generationsToDelete.isEmpty() == false) {
deleteRemoteGeneration(generationsToDelete);
translogTransferManager.deleteStaleTranslogMetadataFilesAsync(remoteGenerationDeletionPermits::release);
deleteStaleRemotePrimaryTermsAndMetadataFiles();
} else {
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
Expand Down Expand Up @@ -418,8 +419,6 @@ private void deleteStaleRemotePrimaryTermsAndMetadataFiles() {
assert readers.isEmpty() == false : "Expected non-empty readers";
long minimumReferencedPrimaryTerm = readers.stream().map(BaseTranslogReader::getPrimaryTerm).min(Long::compare).get();
translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm);
// Second we delete all stale metadata files from remote store
translogTransferManager.deleteStaleTranslogMetadataFilesAsync();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,6 @@ public Set<String> listAll(Iterable<String> path) throws IOException {
return blobStore.blobContainer((BlobPath) path).listBlobs().keySet();
}

@Override
public void listAllAsync(String threadpoolName, Iterable<String> path, ActionListener<Set<String>> listener) {
threadPool.executor(threadpoolName).execute(() -> {
try {
listener.onResponse(listAll(path));
} catch (IOException e) {
listener.onFailure(e);
}
});
}

@Override
public Set<String> listFolders(Iterable<String> path) throws IOException {
return blobStore.blobContainer((BlobPath) path).children().keySet();
Expand All @@ -142,8 +131,18 @@ public void listFoldersAsync(String threadpoolName, Iterable<String> path, Actio
});
}

public void listBlobsInSortedOrder(Iterable<String> path, int limit, ActionListener<List<BlobMetadata>> listener) throws IOException {
public void listAllInSortedOrder(Iterable<String> path, int limit, ActionListener<List<BlobMetadata>> listener) throws IOException {
blobStore.blobContainer((BlobPath) path).listBlobsByPrefixInLexicographicOrder("", limit, listener);
}

public void listAllInSortedOrderAsync(String threadpoolName, Iterable<String> path, int limit, ActionListener<List<BlobMetadata>> listener) {
threadPool.executor(threadpoolName).execute(() -> {
try {
listAllInSortedOrder(path, limit, listener);
} catch (IOException e) {
listener.onFailure(e);
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,6 @@ void uploadBlobAsync(
*/
Set<String> listAll(Iterable<String> path) throws IOException;

/**
* Lists the files and invokes the listener on success or failure
* @param threadpoolName threadpool type which will be used to list all files asynchronously.
* @param path the path to list
* @param listener the callback to be invoked once list operation completes successfully/fails.
*/
void listAllAsync(String threadpoolName, Iterable<String> path, ActionListener<Set<String>> listener);

/**
* Lists the folders inside the path.
* @param path : the path
Expand All @@ -115,6 +107,8 @@ void uploadBlobAsync(
*/
InputStream downloadBlob(Iterable<String> path, String fileName) throws IOException;

void listBlobsInSortedOrder(Iterable<String> path, int limit, ActionListener<List<BlobMetadata>> listener) throws IOException;
void listAllInSortedOrder(Iterable<String> path, int limit, ActionListener<List<BlobMetadata>> listener) throws IOException;

void listAllInSortedOrderAsync(String threadpoolName, Iterable<String> path, int limit, ActionListener<List<BlobMetadata>> listener);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@

import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot;
import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR;
import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.getFileName;

/**
* The class responsible for orchestrating the transfer of a {@link TransferSnapshot} via a {@link TransferService}
Expand Down Expand Up @@ -209,7 +207,7 @@ public TranslogTransferMetadata readMetadata() throws IOException {
);

try {
transferService.listBlobsInSortedOrder(remoteMetadataTransferPath, 1, latchedActionListener);
transferService.listAllInSortedOrder(remoteMetadataTransferPath, 1, latchedActionListener);
latch.await();
} catch (InterruptedException | IOException e) {
throw new IOException("Exception while reading/downloading metadafile", e);
Expand Down Expand Up @@ -237,7 +235,7 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot)
translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap));

return new TransferFileSnapshot(
getFileName(translogTransferMetadata.getPrimaryTerm(), translogTransferMetadata.getGeneration()),
translogTransferMetadata.getFileName(),
getMetadataBytes(translogTransferMetadata),
translogTransferMetadata.getPrimaryTerm()
);
Expand All @@ -256,7 +254,7 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep
try (
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
"translog transfer metadata " + metadata.getPrimaryTerm(),
getFileName(metadata.getPrimaryTerm(), metadata.getGeneration()),
metadata.getFileName(),
output,
TranslogTransferMetadata.BUFFER_SIZE
)
Expand All @@ -279,20 +277,14 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep
*/
public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runnable onCompletion) {
List<String> translogFiles = new ArrayList<>();
List<String> metadataFiles = new ArrayList<>();
generations.forEach(generation -> {
// Add .ckp and .tlog file to translog file list which is located in basePath/<primaryTerm>
String ckpFileName = Translog.getCommitCheckpointFileName(generation);
String translogFileName = Translog.getFilename(generation);
translogFiles.addAll(List.of(ckpFileName, translogFileName));
// Add metadata file tio metadata file list which is located in basePath/metadata
String metadataFileName = TranslogTransferMetadata.getFileName(primaryTerm, generation);
metadataFiles.add(metadataFileName);
});
// Delete the translog and checkpoint files asynchronously
deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);
// Delete the metadata files asynchronously
deleteMetadataFilesAsync(metadataFiles, onCompletion);
}

/**
Expand Down Expand Up @@ -367,24 +359,29 @@ public void onFailure(Exception e) {
});
}

public void deleteStaleTranslogMetadataFilesAsync() {
transferService.listAllAsync(ThreadPool.Names.REMOTE_PURGE, remoteMetadataTransferPath, new ActionListener<>() {
@Override
public void onResponse(Set<String> metadataFiles) {
List<String> sortedMetadataFiles = metadataFiles.stream().sorted(METADATA_FILENAME_COMPARATOR).collect(Collectors.toList());
if (sortedMetadataFiles.size() <= 1) {
logger.trace("Remote Metadata file count is {}, so skipping deletion", sortedMetadataFiles.size());
return;
public void deleteStaleTranslogMetadataFilesAsync(Runnable onCompletion) {
try {
transferService.listAllInSortedOrderAsync(ThreadPool.Names.REMOTE_PURGE, remoteMetadataTransferPath, Integer.MAX_VALUE, new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> sortedMetadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());
if (sortedMetadataFiles.size() <= 1) {
logger.trace("Remote Metadata file count is {}, so skipping deletion", sortedMetadataFiles.size());
return;
}
List<String> metadataFilesToDelete = sortedMetadataFiles.subList(1, sortedMetadataFiles.size());
logger.trace("Deleting remote translog metadata files {}", metadataFilesToDelete);
deleteMetadataFilesAsync(metadataFilesToDelete, onCompletion);
}
List<String> metadataFilesToDelete = sortedMetadataFiles.subList(0, sortedMetadataFiles.size() - 1);
deleteMetadataFilesAsync(metadataFilesToDelete);
}

@Override
public void onFailure(Exception e) {
logger.error("Exception occurred while listing translog metadata files from remote store", e);
}
});
@Override
public void onFailure(Exception e) {
logger.error("Exception occurred while listing translog metadata files from remote store", e);
}
});
}
catch (Exception e) {
logger.error("Exception occurred while listing translog metadata files from remote store", e);
}
}

public void deleteTranslogFiles() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
package org.opensearch.index.translog.transfer;

import org.opensearch.common.SetOnce;
import org.opensearch.index.remote.RemoteStoreUtils;

import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -42,13 +42,15 @@ public class TranslogTransferMetadata {

static final String METADATA_CODEC = "md";

public static final Comparator<String> METADATA_FILENAME_COMPARATOR = new MetadataFilenameComparator();
private final long createdAt;


public TranslogTransferMetadata(long primaryTerm, long generation, long minTranslogGeneration, int count) {
this.primaryTerm = primaryTerm;
this.generation = generation;
this.minTranslogGeneration = minTranslogGeneration;
this.count = count;
this.createdAt = System.currentTimeMillis();
}

public long getPrimaryTerm() {
Expand All @@ -75,13 +77,16 @@ public Map<String, String> getGenerationToPrimaryTermMapper() {
return generationToPrimaryTermMapper.get();
}

public static String getFileName(long primaryTerm, long generation) {
/*
This should be used only at the time of creation.
*/
public String getFileName() {
return String.join(
METADATA_SEPARATOR,
Arrays.asList(
String.valueOf(Long.MAX_VALUE - primaryTerm),
String.valueOf(Long.MAX_VALUE - generation),
String.valueOf(Long.MAX_VALUE - System.currentTimeMillis()),
RemoteStoreUtils.invertLong(primaryTerm),
RemoteStoreUtils.invertLong(generation),
RemoteStoreUtils.invertLong(createdAt),
String.valueOf(primaryTerm),
String.valueOf(generation)
)
Expand All @@ -100,22 +105,4 @@ public boolean equals(Object o) {
TranslogTransferMetadata other = (TranslogTransferMetadata) o;
return Objects.equals(this.primaryTerm, other.primaryTerm) && Objects.equals(this.generation, other.generation);
}

private static class MetadataFilenameComparator implements Comparator<String> {
@Override
public int compare(String first, String second) {
// Format of metadata filename is <Inv Primary Term>__<Inv Generation>__<Inv Timestamp>__<Primary Term>__<Generation>
String[] filenameTokens1 = first.split(METADATA_SEPARATOR);
String[] filenameTokens2 = second.split(METADATA_SEPARATOR);
// Here, we are comparing only inverted primary term and inv generation.
for (int i = 0; i < 2; i++) {
if (filenameTokens1[i].equals(filenameTokens2[i]) == false) {
return Long.compare(Long.parseLong(filenameTokens1[i]), Long.parseLong(filenameTokens2[i]));
}
}
throw new IllegalArgumentException(
"TranslogTransferMetadata files " + first + " and " + second + " have same primary term and generation"
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ public void testMetadataFileDeletion() throws Exception {
assertEquals(1, translog.readers.size());
}
assertBusy(() -> assertEquals(4, translog.allUploaded().size()));
assertBusy(() -> assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));
assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));
int moreDocs = randomIntBetween(3, 10);
logger.info("numDocs={} moreDocs={}", numDocs, moreDocs);
for (int i = numDocs; i < numDocs + moreDocs; i++) {
Expand All @@ -579,7 +579,7 @@ public void testMetadataFileDeletion() throws Exception {
translog.trimUnreferencedReaders();
assertEquals(1 + moreDocs, translog.readers.size());
assertBusy(() -> assertEquals(2 + 2L * moreDocs, translog.allUploaded().size()));
assertBusy(() -> assertEquals(1 + moreDocs, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));
assertBusy(() -> assertEquals(1 , blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));

int totalDocs = numDocs + moreDocs;
translog.setMinSeqNoToKeep(totalDocs - 1);
Expand All @@ -592,7 +592,7 @@ public void testMetadataFileDeletion() throws Exception {
);
translog.setMinSeqNoToKeep(totalDocs);
translog.trimUnreferencedReaders();
assertBusy(() -> assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));
assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));

// Change primary term and test the deletion of older primaries
String translogUUID = translog.translogUUID;
Expand All @@ -607,10 +607,6 @@ public void testMetadataFileDeletion() throws Exception {
long oldPrimaryTerm = primaryTerm.get();
long newPrimaryTerm = primaryTerm.incrementAndGet();

// Check all metadata files corresponds to old primary term
Set<String> mdFileNames = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR));
assertTrue(mdFileNames.stream().allMatch(name -> name.startsWith(String.valueOf(oldPrimaryTerm).concat("__"))));

// Creating RemoteFsTranslog with the same location
Translog newTranslog = create(translogDir, repository, translogUUID);
int newPrimaryTermDocs = randomIntBetween(5, 10);
Expand All @@ -621,10 +617,6 @@ public void testMetadataFileDeletion() throws Exception {
newTranslog.trimUnreferencedReaders();
}

// Check that all metadata files are belonging now to the new primary
mdFileNames = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR));
assertTrue(mdFileNames.stream().allMatch(name -> name.startsWith(String.valueOf(newPrimaryTerm).concat("__"))));

try {
newTranslog.close();
} catch (Exception e) {
Expand Down
Loading

0 comments on commit 3931db0

Please sign in to comment.