Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Translog] Trimming based on remote segment upload and cleaning older tlog files #5662

Merged
merged 8 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ public void trimUnreferencedTranslogFiles() throws TranslogException {
store.decRef();
}
}

@Override
public void setMinSeqNoToKeep(long seqNo) {}
};
} catch (IOException ex) {
throw new RuntimeException(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ public void afterRefresh(boolean didRefresh) {
.filter(file -> !localSegmentsPostRefresh.contains(file))
.collect(Collectors.toSet())
.forEach(localSegmentChecksumMap::remove);
final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine())
.lastRefreshedCheckpoint();
indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1);
}
}
} catch (EngineException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ public void ensureCanFlush() {
}
}

@Override
public void setMinSeqNoToKeep(long seqNo) {
translog.setMinSeqNoToKeep(seqNo);
}

/**
* Reads operations from the translog
* @param location location of translog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws T
@Override
public void ensureCanFlush() {}

@Override
public void setMinSeqNoToKeep(long seqNo) {}

@Override
public int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class RemoteFsTranslog extends Translog {
private final FileTransferTracker fileTransferTracker;
private volatile long maxRemoteTranslogGenerationUploaded;

private volatile long minSeqNoToKeep;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value of 0 should be good when minSeqNoToKeep has not been set ever?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could there be a case when a replica-primary promotion has happened and this has not been initialised or primary-primary relocation (peer recovery) in which case this can be problematic?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In worst case, the default value of 0 would prevent further truncations. There would be some assertion failures , which would be disabled in prod.


public RemoteFsTranslog(
TranslogConfig config,
String translogUUID,
Expand Down Expand Up @@ -282,4 +284,42 @@ public void close() throws IOException {
}
}
}

protected long getMinReferencedGen() throws IOException {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
long minReferencedGen = Math.min(
deletionPolicy.minTranslogGenRequired(readers, current),
minGenerationForSeqNo(Math.min(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1, minSeqNoToKeep), current, readers)
);
assert minReferencedGen >= getMinFileGeneration() : "deletion policy requires a minReferenceGen of ["
+ minReferencedGen
+ "] but the lowest gen available is ["
+ getMinFileGeneration()
+ "]";
assert minReferencedGen <= currentFileGeneration() : "deletion policy requires a minReferenceGen of ["
+ minReferencedGen
+ "] which is higher than the current generation ["
+ currentFileGeneration()
+ "]";
return minReferencedGen;
}

protected void setMinSeqNoToKeep(long seqNo) {
if (seqNo < this.minSeqNoToKeep) {
throw new IllegalArgumentException(
"min seq number required can't go backwards: " + "current [" + this.minSeqNoToKeep + "] new [" + seqNo + "]"
);
}
this.minSeqNoToKeep = seqNo;
Comment on lines +307 to +313
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding min should be able to go lower right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, once it is set to x +10, then we should not be able to set it to x ,as 0..x operations could have been deleted by then . We can set it to x+20 after that .

}

@Override
void deleteReaderFiles(TranslogReader reader) {
try {
translogTransferManager.deleteTranslog(primaryTermSupplier.getAsLong(), reader.generation);
} catch (IOException ignored) {
logger.error("Exception {} while deleting generation {}", ignored, reader.generation);
}
super.deleteReaderFiles(reader);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If delete in remote fails, do we still want to delete local files?

Copy link
Collaborator Author

@gbbafna gbbafna Jan 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we don't want to hold on to local files and filling the disk. We can have a background thread however to clean up the crumbs left here and also have stats around this. I will create an issue and update here.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1684,7 +1684,7 @@ public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) {
}
}

private static long minGenerationForSeqNo(long seqNo, TranslogWriter writer, List<TranslogReader> readers) {
static long minGenerationForSeqNo(long seqNo, TranslogWriter writer, List<TranslogReader> readers) {
long minGen = writer.generation;
for (final TranslogReader reader : readers) {
if (seqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) {
Expand Down Expand Up @@ -1781,7 +1781,7 @@ public void trimUnreferencedReaders() throws IOException {
}
}

private long getMinReferencedGen() throws IOException {
protected long getMinReferencedGen() throws IOException {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
long minReferencedGen = Math.min(
deletionPolicy.minTranslogGenRequired(readers, current),
Expand All @@ -1800,6 +1800,12 @@ private long getMinReferencedGen() throws IOException {
return minReferencedGen;
}

/*
Min Seq number required in translog to restore the complete data .
This might be required when segments are persisted via other mechanism than flush.
*/
protected void setMinSeqNoToKeep(long seqNo) {}

/**
* deletes all files associated with a reader. package-private to be able to simulate node failures at this point
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,11 @@ public interface TranslogManager {
* Checks if the translog has a pending recovery
*/
void ensureCanFlush();

/**
*
* @param seqNo : operations greater or equal to seqNo should be persisted
* This might be required when segments are persisted via other mechanism than flush.
*/
void setMinSeqNoToKeep(long seqNo);
}
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ synchronized boolean assertNoSeqAbove(long belowTerm, long aboveSeqNo) {
*
* Note: any exception during the sync process will be interpreted as a tragic exception and the writer will be closed before
* raising the exception.
* @return
* @return <code>true</code> if this call caused an actual sync operation
*/
public boolean sync() throws IOException {
return syncUpTo(Long.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;

Expand Down Expand Up @@ -75,6 +76,11 @@ public InputStream downloadBlob(Iterable<String> path, String fileName) throws I
return blobStore.blobContainer((BlobPath) path).readBlob(fileName);
}

@Override
public void deleteBlobs(Iterable<String> path, List<String> fileNames) throws IOException {
blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames);
}

@Override
public Set<String> listAll(Iterable<String> path) throws IOException {
return blobStore.blobContainer((BlobPath) path).listBlobs().keySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) {
add(fileSnapshot.getName(), TransferState.FAILED);
}

@Override
public void onDelete(String name) {
fileTransferTracker.remove(name);
}

public Set<TransferFileSnapshot> exclusionFilter(Set<TransferFileSnapshot> original) {
return original.stream()
.filter(fileSnapshot -> fileTransferTracker.get(fileSnapshot.getName()) != TransferState.SUCCESS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Set;

/**
Expand Down Expand Up @@ -42,6 +43,8 @@ void uploadBlobAsync(
*/
void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable<String> remotePath) throws IOException;

void deleteBlobs(Iterable<String> path, List<String> fileNames) throws IOException;

/**
* Lists the files
* @param path : the path to list
Expand All @@ -52,8 +55,8 @@ void uploadBlobAsync(

/**
*
* @param path
* @param fileName
* @param path the remote path from where download should be made
* @param fileName the name of the file
* @return inputstream of the remote file
* @throws IOException the exception while reading the data
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,15 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot)
translogTransferMetadata.getPrimaryTerm()
);
}

public void deleteTranslog(long primaryTerm, long generation) throws IOException {
String ckpFileName = Translog.getCommitCheckpointFileName(generation);
String translogFilename = Translog.getFilename(generation);
// ToDo - Take care of metadata file cleanup
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a backlog issue : #5677

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to be tracking checkpoint files or only translog files, since we only care about the latest checkpoint?

// https://github.com/opensearch-project/OpenSearch/issues/5677
fileTransferTracker.onDelete(ckpFileName);
fileTransferTracker.onDelete(translogFilename);
List<String> files = List.of(ckpFileName, translogFilename);
transferService.deleteBlobs(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), files);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;

/**
* The listener to be invoked on the completion or failure of a {@link TransferFileSnapshot}
* The listener to be invoked on the completion or failure of a {@link TransferFileSnapshot} or deletion of file
*
* @opensearch.internal
*/
Expand All @@ -29,4 +29,6 @@ public interface FileTransferListener {
* @param e the exception while processing the {@link TransferFileSnapshot}
*/
void onFailure(TransferFileSnapshot fileSnapshot, Exception e);

void onDelete(String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -448,13 +448,13 @@ public void testSimpleOperationsUpload() throws IOException {
assertThat(snapshot.totalOperations(), equalTo(ops.size()));
}

assertEquals(translog.allUploaded().size(), 4);
assertEquals(translog.allUploaded().size(), 2);

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 }));
assertEquals(translog.allUploaded().size(), 6);
assertEquals(translog.allUploaded().size(), 4);

translog.rollGeneration();
assertEquals(translog.allUploaded().size(), 6);
assertEquals(translog.allUploaded().size(), 4);

Set<String> mdFiles = blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add("metadata")
Expand Down Expand Up @@ -495,6 +495,38 @@ public void testSimpleOperationsUpload() throws IOException {
assertArrayEquals(ckp, content);
}
}

// expose the new checkpoint (simulating a commit), before we trim the translog
translog.deletionPolicy.setLocalCheckpointOfSafeCommit(0);
// simulating the remote segment upload .
translog.setMinSeqNoToKeep(0);
// This should not trim anything
translog.trimUnreferencedReaders();
assertEquals(translog.allUploaded().size(), 4);
assertEquals(
blobStoreTransferService.listAll(
repository.basePath()
.add(shardId.getIndex().getUUID())
.add(String.valueOf(shardId.id()))
.add(String.valueOf(primaryTerm.get()))
).size(),
4
);

// This should trim tlog-2.* files as it contains seq no 0
translog.setMinSeqNoToKeep(1);
translog.trimUnreferencedReaders();
assertEquals(translog.allUploaded().size(), 2);
assertEquals(
blobStoreTransferService.listAll(
repository.basePath()
.add(shardId.getIndex().getUUID())
.add(String.valueOf(shardId.id()))
.add(String.valueOf(primaryTerm.get()))
).size(),
2
);

}

private Long populateTranslogOps(boolean withMissingOps) throws IOException {
Expand Down Expand Up @@ -684,6 +716,7 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep
// expose the new checkpoint (simulating a commit), before we trim the translog
lastCommittedLocalCheckpoint.set(localCheckpoint);
deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
translog.setMinSeqNoToKeep(localCheckpoint + 1);
translog.trimUnreferencedReaders();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ public void onSuccess(TransferFileSnapshot fileSnapshot) {
public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) {
fileTransferFailed.incrementAndGet();
}

@Override
public void onDelete(String name) {}
}
);

Expand Down