Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Update Translog Metadata file name #8350

Merged
merged 6 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ public void trimUnreferencedReaders() throws IOException {
}
if (generationsToDelete.isEmpty() == false) {
deleteRemoteGeneration(generationsToDelete);
deleteStaleRemotePrimaryTermsAndMetadataFiles();
translogTransferManager.deleteStaleTranslogMetadataFilesAsync(remoteGenerationDeletionPermits::release);
deleteStaleRemotePrimaryTerms();
} else {
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
}
Expand All @@ -409,7 +410,7 @@ private void deleteRemoteGeneration(Set<Long> generations) {
* <br>
* This will also delete all stale translog metadata files from remote except the latest basis the metadata file comparator.
*/
private void deleteStaleRemotePrimaryTermsAndMetadataFiles() {
private void deleteStaleRemotePrimaryTerms() {
// The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there
// are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part
// of older primary term.
Expand All @@ -418,8 +419,6 @@ private void deleteStaleRemotePrimaryTermsAndMetadataFiles() {
assert readers.isEmpty() == false : "Expected non-empty readers";
long minimumReferencedPrimaryTerm = readers.stream().map(BaseTranslogReader::getPrimaryTerm).min(Long::compare).get();
translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm);
// Second we delete all stale metadata files from remote store
translogTransferManager.deleteStaleTranslogMetadataFilesAsync();
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
Expand All @@ -23,6 +24,8 @@
import java.util.List;
import java.util.Set;

import static org.opensearch.common.blobstore.BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC;

/**
* Service that handles remote transfer of translog and checkpoint files
*
Expand Down Expand Up @@ -114,17 +117,6 @@ public Set<String> listAll(Iterable<String> path) throws IOException {
return blobStore.blobContainer((BlobPath) path).listBlobs().keySet();
}

@Override
public void listAllAsync(String threadpoolName, Iterable<String> path, ActionListener<Set<String>> listener) {
threadPool.executor(threadpoolName).execute(() -> {
try {
listener.onResponse(listAll(path));
} catch (IOException e) {
listener.onFailure(e);
}
});
}

@Override
public Set<String> listFolders(Iterable<String> path) throws IOException {
return blobStore.blobContainer((BlobPath) path).children().keySet();
Expand All @@ -140,4 +132,18 @@ public void listFoldersAsync(String threadpoolName, Iterable<String> path, Actio
}
});
}

public void listAllInSortedOrder(Iterable<String> path, int limit, ActionListener<List<BlobMetadata>> listener) {
blobStore.blobContainer((BlobPath) path).listBlobsByPrefixInSortedOrder("", limit, LEXICOGRAPHIC, listener);
}

public void listAllInSortedOrderAsync(
String threadpoolName,
Iterable<String> path,
int limit,
ActionListener<List<BlobMetadata>> listener
) {
threadPool.executor(threadpoolName).execute(() -> { listAllInSortedOrder(path, limit, listener); });
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.translog.transfer;

import org.opensearch.action.ActionListener;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;

import java.io.IOException;
Expand Down Expand Up @@ -80,14 +81,6 @@ void uploadBlobAsync(
*/
Set<String> listAll(Iterable<String> path) throws IOException;

/**
* Lists the files and invokes the listener on success or failure
* @param threadpoolName threadpool type which will be used to list all files asynchronously.
* @param path the path to list
* @param listener the callback to be invoked once list operation completes successfully/fails.
*/
void listAllAsync(String threadpoolName, Iterable<String> path, ActionListener<Set<String>> listener);

/**
* Lists the folders inside the path.
* @param path : the path
Expand All @@ -114,4 +107,8 @@ void uploadBlobAsync(
*/
InputStream downloadBlob(Iterable<String> path, String fileName) throws IOException;

void listAllInSortedOrder(Iterable<String> path, int limit, ActionListener<List<BlobMetadata>> listener);

void listAllInSortedOrderAsync(String threadpoolName, Iterable<String> path, int limit, ActionListener<List<BlobMetadata>> listener);

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.opensearch.action.ActionListener;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.SetOnce;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.VersionedCodecStreamWrapper;
Expand Down Expand Up @@ -42,8 +44,6 @@

import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot;
import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR;
import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.getFileName;

/**
* The class responsible for orchestrating the transfer of a {@link TransferSnapshot} via a {@link TransferService}
Expand Down Expand Up @@ -185,15 +185,39 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th
}

public TranslogTransferMetadata readMetadata() throws IOException {
return transferService.listAll(remoteMetadataTransferPath).stream().max(METADATA_FILENAME_COMPARATOR).map(filename -> {
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) {
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
return metadataStreamWrapper.readStream(indexInput);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e);
return null;
}
}).orElse(null);
SetOnce<TranslogTransferMetadata> metadataSetOnce = new SetOnce<>();
SetOnce<IOException> exceptionSetOnce = new SetOnce<>();
final CountDownLatch latch = new CountDownLatch(1);
LatchedActionListener<List<BlobMetadata>> latchedActionListener = new LatchedActionListener<>(
ActionListener.wrap(blobMetadataList -> {
if (blobMetadataList.isEmpty()) return;
String filename = blobMetadataList.get(0).name();
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) {
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
metadataSetOnce.set(metadataStreamWrapper.readStream(indexInput));
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e);
exceptionSetOnce.set(e);
}
}, e -> {
logger.error(() -> new ParameterizedMessage("Exception while listing metadata files "), e);
exceptionSetOnce.set((IOException) e);
}),
latch
);

try {
transferService.listAllInSortedOrder(remoteMetadataTransferPath, 1, latchedActionListener);
latch.await();
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
} catch (InterruptedException e) {
throw new IOException("Exception while reading/downloading metadafile", e);
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
}

if (exceptionSetOnce.get() != null) {
throw exceptionSetOnce.get();
}

return metadataSetOnce.get();
}

private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) throws IOException {
Expand All @@ -211,7 +235,7 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot)
translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap));

return new TransferFileSnapshot(
getFileName(translogTransferMetadata.getPrimaryTerm(), translogTransferMetadata.getGeneration()),
translogTransferMetadata.getFileName(),
getMetadataBytes(translogTransferMetadata),
translogTransferMetadata.getPrimaryTerm()
);
Expand All @@ -230,7 +254,7 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep
try (
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
"translog transfer metadata " + metadata.getPrimaryTerm(),
getFileName(metadata.getPrimaryTerm(), metadata.getGeneration()),
metadata.getFileName(),
output,
TranslogTransferMetadata.BUFFER_SIZE
)
Expand All @@ -253,20 +277,14 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep
*/
public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runnable onCompletion) {
List<String> translogFiles = new ArrayList<>();
List<String> metadataFiles = new ArrayList<>();
generations.forEach(generation -> {
// Add .ckp and .tlog file to translog file list which is located in basePath/<primaryTerm>
String ckpFileName = Translog.getCommitCheckpointFileName(generation);
String translogFileName = Translog.getFilename(generation);
translogFiles.addAll(List.of(ckpFileName, translogFileName));
// Add metadata file tio metadata file list which is located in basePath/metadata
String metadataFileName = TranslogTransferMetadata.getFileName(primaryTerm, generation);
metadataFiles.add(metadataFileName);
});
// Delete the translog and checkpoint files asynchronously
deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);
// Delete the metadata files asynchronously
deleteMetadataFilesAsync(metadataFiles, onCompletion);
}

/**
Expand Down Expand Up @@ -341,24 +359,37 @@ public void onFailure(Exception e) {
});
}

public void deleteStaleTranslogMetadataFilesAsync() {
transferService.listAllAsync(ThreadPool.Names.REMOTE_PURGE, remoteMetadataTransferPath, new ActionListener<>() {
@Override
public void onResponse(Set<String> metadataFiles) {
List<String> sortedMetadataFiles = metadataFiles.stream().sorted(METADATA_FILENAME_COMPARATOR).collect(Collectors.toList());
if (sortedMetadataFiles.size() <= 1) {
logger.trace("Remote Metadata file count is {}, so skipping deletion", sortedMetadataFiles.size());
return;
}
List<String> metadataFilesToDelete = sortedMetadataFiles.subList(0, sortedMetadataFiles.size() - 1);
deleteMetadataFilesAsync(metadataFilesToDelete);
}
public void deleteStaleTranslogMetadataFilesAsync(Runnable onCompletion) {
try {
transferService.listAllInSortedOrderAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteMetadataTransferPath,
Integer.MAX_VALUE,
new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> sortedMetadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());
if (sortedMetadataFiles.size() <= 1) {
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
logger.trace("Remote Metadata file count is {}, so skipping deletion", sortedMetadataFiles.size());
onCompletion.run();
return;
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
}
List<String> metadataFilesToDelete = sortedMetadataFiles.subList(1, sortedMetadataFiles.size());
logger.trace("Deleting remote translog metadata files {}", metadataFilesToDelete);
deleteMetadataFilesAsync(metadataFilesToDelete, onCompletion);
}

@Override
public void onFailure(Exception e) {
logger.error("Exception occurred while listing translog metadata files from remote store", e);
}
});
@Override
public void onFailure(Exception e) {
logger.error("Exception occurred while listing translog metadata files from remote store", e);
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
onCompletion.run();
}
}
);
} catch (Exception e) {
logger.error("Exception occurred while listing translog metadata files from remote store", e);
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
onCompletion.run();
}
}

public void deleteTranslogFiles() throws IOException {
Expand Down Expand Up @@ -407,14 +438,6 @@ public void onFailure(Exception e) {
}
}

/**
* Deletes metadata files asynchronously using the {@code REMOTE_PURGE} threadpool.
* @param metadataFilesToDelete list of metadata files to be deleted.
*/
private void deleteMetadataFilesAsync(List<String> metadataFilesToDelete) {
deleteMetadataFilesAsync(metadataFilesToDelete, () -> {});
}

/**
* Deletes metadata files asynchronously using the {@code REMOTE_PURGE} threadpool. On success or failure, runs {@code onCompletion}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
package org.opensearch.index.translog.transfer;

import org.opensearch.common.SetOnce;
import org.opensearch.index.remote.RemoteStoreUtils;

import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -42,13 +42,14 @@ public class TranslogTransferMetadata {

static final String METADATA_CODEC = "md";

public static final Comparator<String> METADATA_FILENAME_COMPARATOR = new MetadataFilenameComparator();
private final long createdAt;

public TranslogTransferMetadata(long primaryTerm, long generation, long minTranslogGeneration, int count) {
this.primaryTerm = primaryTerm;
this.generation = generation;
this.minTranslogGeneration = minTranslogGeneration;
this.count = count;
this.createdAt = System.currentTimeMillis();
}

public long getPrimaryTerm() {
Expand All @@ -75,8 +76,19 @@ public Map<String, String> getGenerationToPrimaryTermMapper() {
return generationToPrimaryTermMapper.get();
}

public static String getFileName(long primaryTerm, long generation) {
return String.join(METADATA_SEPARATOR, Arrays.asList(String.valueOf(primaryTerm), String.valueOf(generation)));
/*
This should be used only at the time of creation.
*/
public String getFileName() {
return String.join(
METADATA_SEPARATOR,
Arrays.asList(
RemoteStoreUtils.invertLong(primaryTerm),
RemoteStoreUtils.invertLong(generation),
RemoteStoreUtils.invertLong(createdAt),
String.valueOf(CURRENT_VERSION)
)
);
}

@Override
Expand All @@ -91,22 +103,4 @@ public boolean equals(Object o) {
TranslogTransferMetadata other = (TranslogTransferMetadata) o;
return Objects.equals(this.primaryTerm, other.primaryTerm) && Objects.equals(this.generation, other.generation);
}

private static class MetadataFilenameComparator implements Comparator<String> {
@Override
public int compare(String first, String second) {
// Format of metadata filename is <Primary Term>__<Generation>
String[] filenameTokens1 = first.split(METADATA_SEPARATOR);
String[] filenameTokens2 = second.split(METADATA_SEPARATOR);
// Here, we are comparing only primary term and generation.
for (int i = 0; i < filenameTokens1.length; i++) {
if (filenameTokens1[i].equals(filenameTokens2[i]) == false) {
return Long.compare(Long.parseLong(filenameTokens1[i]), Long.parseLong(filenameTokens2[i]));
}
}
throw new IllegalArgumentException(
"TranslogTransferMetadata files " + first + " and " + second + " have same primary term and generation"
);
}
}
}
Loading