Skip to content

Commit

Permalink
Add support to skip pinned timestamp in remote translog garbage colle…
Browse files Browse the repository at this point in the history
…ctor (opensearch-project#15416)

Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale authored and akolarkunnu committed Sep 10, 2024
1 parent 072b238 commit 611017e
Show file tree
Hide file tree
Showing 9 changed files with 1,441 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -109,13 +110,16 @@ public void cleanUp() throws Exception {
assertAcked(
client().admin().indices().prepareDelete("*").setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN).get()
);
assertBusy(() -> {
try {
assertEquals(0, getFileCount(translogRepoPath));
} catch (IOException e) {
fail();
}
}, 30, TimeUnit.SECONDS);
// With pinned timestamp, we can have tlog files even after deletion.
if (RemoteStoreSettings.isPinnedTimestampsEnabled() == false) {
assertBusy(() -> {
try {
assertEquals(0, getFileCount(translogRepoPath));
} catch (IOException e) {
fail();
}
}, 30, TimeUnit.SECONDS);
}
super.teardown();
}

Expand Down
369 changes: 333 additions & 36 deletions server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ TranslogReader openReader(Path path, Checkpoint checkpoint) throws IOException {
*/
public static long parseIdFromFileName(Path translogFile) {
final String fileName = translogFile.getFileName().toString();
return parseIdFromFileName(fileName);
}

public static long parseIdFromFileName(String fileName) {
final Matcher matcher = PARSE_STRICT_ID_PATTERN.matcher(fileName);
if (matcher.matches()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot;
import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR;

/**
* The class responsible for orchestrating the transfer of a {@link TransferSnapshot} via a {@link TransferService}
Expand Down Expand Up @@ -337,35 +339,54 @@ private void deleteFileIfExists(Path filePath) throws IOException {
}
}

public TranslogTransferMetadata readMetadata(long pinnedTimestamp) throws IOException {
if (pinnedTimestamp <= 0) {
return readMetadata();
}
return readMetadata((blobMetadataList) -> {
List<String> metadataFiles = blobMetadataList.stream().map(BlobMetadata::name).collect(Collectors.toList());
Set<String> metadataFilesMatchingTimestamp = RemoteStoreUtils.getPinnedTimestampLockedFiles(
metadataFiles,
Set.of(pinnedTimestamp),
file -> RemoteStoreUtils.invertLong(file.split(METADATA_SEPARATOR)[3]),
TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen,
true
);
if (metadataFilesMatchingTimestamp.isEmpty()) {
return null;
}
assert metadataFilesMatchingTimestamp.size() == 1 : "There should be only 1 metadata file matching given timestamp";
return metadataFilesMatchingTimestamp.stream().findFirst().get();
}, Integer.MAX_VALUE);
}

public TranslogTransferMetadata readMetadata() throws IOException {
return readMetadata((blobMetadataList) -> {
RemoteStoreUtils.verifyNoMultipleWriters(
blobMetadataList.stream().map(BlobMetadata::name).collect(Collectors.toList()),
TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen
);
return blobMetadataList.get(0).name();
}, METADATA_FILES_TO_FETCH);
}

private TranslogTransferMetadata readMetadata(Function<List<BlobMetadata>, String> getMetadataFileToRead, int numberOfFilesToFetch)
throws IOException {
SetOnce<TranslogTransferMetadata> metadataSetOnce = new SetOnce<>();
SetOnce<IOException> exceptionSetOnce = new SetOnce<>();
final CountDownLatch latch = new CountDownLatch(1);
LatchedActionListener<List<BlobMetadata>> latchedActionListener = new LatchedActionListener<>(
ActionListener.wrap(blobMetadataList -> {
if (blobMetadataList.isEmpty()) return;
RemoteStoreUtils.verifyNoMultipleWriters(
blobMetadataList.stream().map(BlobMetadata::name).collect(Collectors.toList()),
TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen
);
String filename = blobMetadataList.get(0).name();
boolean downloadStatus = false;
long downloadStartTime = System.nanoTime(), bytesToRead = 0;
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) {
// Capture number of bytes for stats before reading
bytesToRead = inputStream.available();
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
metadataSetOnce.set(metadataStreamWrapper.readStream(indexInput));
downloadStatus = true;
String filename = getMetadataFileToRead.apply(blobMetadataList);
if (filename == null) {
return;
}
try {
metadataSetOnce.set(readMetadata(filename));
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e);
exceptionSetOnce.set(e);
} finally {
remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L);
logger.debug("translogMetadataDownloadStatus={}", downloadStatus);
if (downloadStatus) {
remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead);
}
}
}, e -> {
if (e instanceof RuntimeException) {
Expand All @@ -381,12 +402,14 @@ public TranslogTransferMetadata readMetadata() throws IOException {
transferService.listAllInSortedOrder(
remoteMetadataTransferPath,
TranslogTransferMetadata.METADATA_PREFIX,
METADATA_FILES_TO_FETCH,
numberOfFilesToFetch,
latchedActionListener
);
latch.await();
if (latch.await(remoteStoreSettings.getClusterRemoteTranslogTransferTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
throw new RuntimeException("Timed out reading metadata file");
}
} catch (InterruptedException e) {
throw new IOException("Exception while reading/downloading metadafile", e);
throw new IOException("Exception while reading/downloading metadata file", e);
}

if (exceptionSetOnce.get() != null) {
Expand All @@ -396,6 +419,26 @@ public TranslogTransferMetadata readMetadata() throws IOException {
return metadataSetOnce.get();
}

public TranslogTransferMetadata readMetadata(String metadataFilename) throws IOException {
boolean downloadStatus = false;
TranslogTransferMetadata translogTransferMetadata = null;
long downloadStartTime = System.nanoTime(), bytesToRead = 0;
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, metadataFilename)) {
// Capture number of bytes for stats before reading
bytesToRead = inputStream.available();
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
translogTransferMetadata = metadataStreamWrapper.readStream(indexInput);
downloadStatus = true;
} finally {
remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L);
logger.debug("translogMetadataDownloadStatus={}", downloadStatus);
if (downloadStatus) {
remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead);
}
}
return translogTransferMetadata;
}

private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) throws IOException {
Map<String, String> generationPrimaryTermMap = transferSnapshot.getTranslogFileSnapshots().stream().map(s -> {
assert s instanceof TranslogFileSnapshot;
Expand Down Expand Up @@ -549,6 +592,16 @@ public void onFailure(Exception e) {
});
}

public void listTranslogMetadataFilesAsync(ActionListener<List<BlobMetadata>> listener) {
transferService.listAllInSortedOrderAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteMetadataTransferPath,
TranslogTransferMetadata.METADATA_PREFIX,
Integer.MAX_VALUE,
listener
);
}

public void deleteStaleTranslogMetadataFilesAsync(Runnable onCompletion) {
try {
transferService.listAllInSortedOrderAsync(
Expand Down Expand Up @@ -635,7 +688,7 @@ public void onFailure(Exception e) {
* @param files list of metadata files to be deleted.
* @param onCompletion runnable to run on completion of deletion regardless of success/failure.
*/
private void deleteMetadataFilesAsync(List<String> files, Runnable onCompletion) {
public void deleteMetadataFilesAsync(List<String> files, Runnable onCompletion) {
try {
transferService.deleteBlobsAsync(ThreadPool.Names.REMOTE_PURGE, remoteMetadataTransferPath, files, new ActionListener<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

package org.opensearch.index.translog.transfer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.common.SetOnce;
import org.opensearch.common.collect.Tuple;
import org.opensearch.index.remote.RemoteStoreUtils;
Expand All @@ -25,6 +29,8 @@
*/
public class TranslogTransferMetadata {

public static final Logger logger = LogManager.getLogger(TranslogTransferMetadata.class);

private final long primaryTerm;

private final long generation;
Expand Down Expand Up @@ -128,6 +134,24 @@ public static Tuple<String, String> getNodeIdByPrimaryTermAndGen(String filename
return new Tuple<>(primaryTermAndGen, nodeId);
}

public static Tuple<Long, Long> getMinMaxTranslogGenerationFromFilename(String filename) {
String[] tokens = filename.split(METADATA_SEPARATOR);
if (tokens.length < 7) {
// For versions < 2.17, we don't have min translog generation.
return null;
}
assert Version.CURRENT.onOrAfter(Version.V_2_17_0);
try {
// instead of direct index, we go backwards to avoid running into same separator in nodeId
String minGeneration = tokens[tokens.length - 2];
String maxGeneration = tokens[2];
return new Tuple<>(RemoteStoreUtils.invertLong(minGeneration), RemoteStoreUtils.invertLong(maxGeneration));
} catch (NumberFormatException e) {
logger.error(() -> new ParameterizedMessage("Exception while getting min and max translog generation from: {}", filename), e);
return null;
}
}

@Override
public int hashCode() {
return Objects.hash(primaryTerm, generation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ public static TimeValue getPinnedTimestampsLookbackInterval() {
return pinnedTimestampsLookbackInterval;
}

// Visible for testing
public static void setPinnedTimestampsLookbackInterval(TimeValue pinnedTimestampsLookbackInterval) {
RemoteStoreSettings.pinnedTimestampsLookbackInterval = pinnedTimestampsLookbackInterval;
}

public static boolean isPinnedTimestampsEnabled() {
return isPinnedTimestampsEnabled;
}
Expand Down
Loading

0 comments on commit 611017e

Please sign in to comment.