Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to skip pinned timestamp in remote translog garbage collector #15416

Merged
merged 14 commits into from
Sep 2, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -109,13 +110,16 @@ public void cleanUp() throws Exception {
assertAcked(
client().admin().indices().prepareDelete("*").setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN).get()
);
assertBusy(() -> {
try {
assertEquals(0, getFileCount(translogRepoPath));
} catch (IOException e) {
fail();
}
}, 30, TimeUnit.SECONDS);
// With pinned timestamp, we can have tlog files even after deletion.
if (RemoteStoreSettings.isPinnedTimestampsEnabled() == false) {
assertBusy(() -> {
try {
assertEquals(0, getFileCount(translogRepoPath));
} catch (IOException e) {
fail();
}
}, 30, TimeUnit.SECONDS);
}
super.teardown();
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ TranslogReader openReader(Path path, Checkpoint checkpoint) throws IOException {
*/
public static long parseIdFromFileName(Path translogFile) {
final String fileName = translogFile.getFileName().toString();
return parseIdFromFileName(fileName);
}

public static long parseIdFromFileName(String fileName) {
final Matcher matcher = PARSE_STRICT_ID_PATTERN.matcher(fileName);
if (matcher.matches()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot;
import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR;

/**
* The class responsible for orchestrating the transfer of a {@link TransferSnapshot} via a {@link TransferService}
Expand Down Expand Up @@ -337,35 +339,54 @@
}
}

public TranslogTransferMetadata readMetadata(long pinnedTimestamp) throws IOException {
if (pinnedTimestamp <= 0) {
return readMetadata();
}
return readMetadata((blobMetadataList) -> {
List<String> metadataFiles = blobMetadataList.stream().map(BlobMetadata::name).collect(Collectors.toList());
Set<String> metadataFilesMatchingTimestamp = RemoteStoreUtils.getPinnedTimestampLockedFiles(
metadataFiles,
Set.of(pinnedTimestamp),
file -> RemoteStoreUtils.invertLong(file.split(METADATA_SEPARATOR)[3]),
TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen,
true
);
if (metadataFilesMatchingTimestamp.isEmpty()) {
return null;
}
assert metadataFilesMatchingTimestamp.size() == 1 : "There should be only 1 metadata file matching given timestamp";
return metadataFilesMatchingTimestamp.stream().findFirst().get();
}, Integer.MAX_VALUE);
}

public TranslogTransferMetadata readMetadata() throws IOException {
return readMetadata((blobMetadataList) -> {
RemoteStoreUtils.verifyNoMultipleWriters(
blobMetadataList.stream().map(BlobMetadata::name).collect(Collectors.toList()),
TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen
);
return blobMetadataList.get(0).name();
}, METADATA_FILES_TO_FETCH);
}

private TranslogTransferMetadata readMetadata(Function<List<BlobMetadata>, String> getMetadataFileToRead, int numberOfFilesToFetch)
throws IOException {
SetOnce<TranslogTransferMetadata> metadataSetOnce = new SetOnce<>();
SetOnce<IOException> exceptionSetOnce = new SetOnce<>();
final CountDownLatch latch = new CountDownLatch(1);
LatchedActionListener<List<BlobMetadata>> latchedActionListener = new LatchedActionListener<>(
ActionListener.wrap(blobMetadataList -> {
if (blobMetadataList.isEmpty()) return;
RemoteStoreUtils.verifyNoMultipleWriters(
blobMetadataList.stream().map(BlobMetadata::name).collect(Collectors.toList()),
TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen
);
String filename = blobMetadataList.get(0).name();
boolean downloadStatus = false;
long downloadStartTime = System.nanoTime(), bytesToRead = 0;
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) {
// Capture number of bytes for stats before reading
bytesToRead = inputStream.available();
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
metadataSetOnce.set(metadataStreamWrapper.readStream(indexInput));
downloadStatus = true;
String filename = getMetadataFileToRead.apply(blobMetadataList);
if (filename == null) {
return;
}
try {
metadataSetOnce.set(readMetadata(filename));
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e);
exceptionSetOnce.set(e);
} finally {
remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L);
logger.debug("translogMetadataDownloadStatus={}", downloadStatus);
if (downloadStatus) {
remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead);
}
}
}, e -> {
if (e instanceof RuntimeException) {
Expand All @@ -381,12 +402,14 @@
transferService.listAllInSortedOrder(
remoteMetadataTransferPath,
TranslogTransferMetadata.METADATA_PREFIX,
METADATA_FILES_TO_FETCH,
numberOfFilesToFetch,
latchedActionListener
);
latch.await();
if (latch.await(remoteStoreSettings.getClusterRemoteTranslogTransferTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
throw new RuntimeException("Timed out reading metadata file");

Check warning on line 409 in server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java#L409

Added line #L409 was not covered by tests
}
} catch (InterruptedException e) {
throw new IOException("Exception while reading/downloading metadafile", e);
throw new IOException("Exception while reading/downloading metadata file", e);

Check warning on line 412 in server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java#L412

Added line #L412 was not covered by tests
}

if (exceptionSetOnce.get() != null) {
Expand All @@ -396,6 +419,26 @@
return metadataSetOnce.get();
}

public TranslogTransferMetadata readMetadata(String metadataFilename) throws IOException {
boolean downloadStatus = false;
TranslogTransferMetadata translogTransferMetadata = null;
long downloadStartTime = System.nanoTime(), bytesToRead = 0;
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, metadataFilename)) {
// Capture number of bytes for stats before reading
bytesToRead = inputStream.available();
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
translogTransferMetadata = metadataStreamWrapper.readStream(indexInput);
downloadStatus = true;
} finally {
remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L);
logger.debug("translogMetadataDownloadStatus={}", downloadStatus);
if (downloadStatus) {
remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead);
}
}
return translogTransferMetadata;
}

private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) throws IOException {
Map<String, String> generationPrimaryTermMap = transferSnapshot.getTranslogFileSnapshots().stream().map(s -> {
assert s instanceof TranslogFileSnapshot;
Expand Down Expand Up @@ -549,6 +592,16 @@
});
}

public void listTranslogMetadataFilesAsync(ActionListener<List<BlobMetadata>> listener) {
transferService.listAllInSortedOrderAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteMetadataTransferPath,
TranslogTransferMetadata.METADATA_PREFIX,
Integer.MAX_VALUE,
listener
);
}

public void deleteStaleTranslogMetadataFilesAsync(Runnable onCompletion) {
try {
transferService.listAllInSortedOrderAsync(
Expand Down Expand Up @@ -635,7 +688,7 @@
* @param files list of metadata files to be deleted.
* @param onCompletion runnable to run on completion of deletion regardless of success/failure.
*/
private void deleteMetadataFilesAsync(List<String> files, Runnable onCompletion) {
public void deleteMetadataFilesAsync(List<String> files, Runnable onCompletion) {
try {
transferService.deleteBlobsAsync(ThreadPool.Names.REMOTE_PURGE, remoteMetadataTransferPath, files, new ActionListener<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

package org.opensearch.index.translog.transfer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.common.SetOnce;
import org.opensearch.common.collect.Tuple;
import org.opensearch.index.remote.RemoteStoreUtils;
Expand All @@ -25,6 +29,8 @@
*/
public class TranslogTransferMetadata {

public static final Logger logger = LogManager.getLogger(TranslogTransferMetadata.class);

private final long primaryTerm;

private final long generation;
Expand Down Expand Up @@ -128,6 +134,24 @@ public static Tuple<String, String> getNodeIdByPrimaryTermAndGen(String filename
return new Tuple<>(primaryTermAndGen, nodeId);
}

public static Tuple<Long, Long> getMinMaxTranslogGenerationFromFilename(String filename) {
String[] tokens = filename.split(METADATA_SEPARATOR);
if (tokens.length < 7) {
// For versions < 2.17, we don't have min translog generation.
return null;
}
assert Version.CURRENT.onOrAfter(Version.V_2_17_0);
try {
// instead of direct index, we go backwards to avoid running into same separator in nodeId
String minGeneration = tokens[tokens.length - 2];
String maxGeneration = tokens[2];
return new Tuple<>(RemoteStoreUtils.invertLong(minGeneration), RemoteStoreUtils.invertLong(maxGeneration));
} catch (NumberFormatException e) {
logger.error(() -> new ParameterizedMessage("Exception while getting min and max translog generation from: {}", filename), e);
return null;
}
}

@Override
public int hashCode() {
return Objects.hash(primaryTerm, generation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ public static TimeValue getPinnedTimestampsLookbackInterval() {
return pinnedTimestampsLookbackInterval;
}

// Visible for testing
public static void setPinnedTimestampsLookbackInterval(TimeValue pinnedTimestampsLookbackInterval) {
RemoteStoreSettings.pinnedTimestampsLookbackInterval = pinnedTimestampsLookbackInterval;
}

public static boolean isPinnedTimestampsEnabled() {
return isPinnedTimestampsEnabled;
}
Expand Down
Loading
Loading