Skip to content

Commit 4ca9d4a

Browse files
sachinpkalegbbafna
authored andcommitted
[Remote Translog] Trimming based on remote segment upload and cleaning older tlog files (opensearch-project#5662) (opensearch-project#5793)
* RemoteFSTranslog Trimming and GC Logic Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
1 parent 3210fa8 commit 4ca9d4a

15 files changed

+140
-9
lines changed

server/src/main/java/org/opensearch/index/engine/NoOpEngine.java

+3
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,9 @@ public void trimUnreferencedTranslogFiles() throws TranslogException {
227227
store.decRef();
228228
}
229229
}
230+
231+
@Override
232+
public void setMinSeqNoToKeep(long seqNo) {}
230233
};
231234
} catch (IOException ex) {
232235
throw new RuntimeException(ex);

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

+4
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ public void afterRefresh(boolean didRefresh) {
144144
.filter(file -> !localSegmentsPostRefresh.contains(file))
145145
.collect(Collectors.toSet())
146146
.forEach(localSegmentChecksumMap::remove);
147+
final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine())
148+
.lastRefreshedCheckpoint();
149+
((InternalEngine) indexShard.getEngine()).translogManager()
150+
.setMinSeqNoToKeep(lastRefreshedCheckpoint + 1);
147151
}
148152
}
149153
} catch (EngineException e) {

server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java

+5
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,11 @@ public void ensureCanFlush() {
288288
}
289289
}
290290

291+
@Override
292+
public void setMinSeqNoToKeep(long seqNo) {
293+
translog.setMinSeqNoToKeep(seqNo);
294+
}
295+
291296
/**
292297
* Reads operations from the translog
293298
* @param location

server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java

+3
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws T
9595
@Override
9696
public void ensureCanFlush() {}
9797

98+
@Override
99+
public void setMinSeqNoToKeep(long seqNo) {}
100+
98101
@Override
99102
public int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException {
100103
return 0;

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

+40
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public class RemoteFsTranslog extends Translog {
4747
private final FileTransferTracker fileTransferTracker;
4848
private volatile long maxRemoteTranslogGenerationUploaded;
4949

50+
private volatile long minSeqNoToKeep;
51+
5052
public RemoteFsTranslog(
5153
TranslogConfig config,
5254
String translogUUID,
@@ -282,4 +284,42 @@ public void close() throws IOException {
282284
}
283285
}
284286
}
287+
288+
protected long getMinReferencedGen() throws IOException {
289+
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
290+
long minReferencedGen = Math.min(
291+
deletionPolicy.minTranslogGenRequired(readers, current),
292+
minGenerationForSeqNo(Math.min(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1, minSeqNoToKeep), current, readers)
293+
);
294+
assert minReferencedGen >= getMinFileGeneration() : "deletion policy requires a minReferenceGen of ["
295+
+ minReferencedGen
296+
+ "] but the lowest gen available is ["
297+
+ getMinFileGeneration()
298+
+ "]";
299+
assert minReferencedGen <= currentFileGeneration() : "deletion policy requires a minReferenceGen of ["
300+
+ minReferencedGen
301+
+ "] which is higher than the current generation ["
302+
+ currentFileGeneration()
303+
+ "]";
304+
return minReferencedGen;
305+
}
306+
307+
protected void setMinSeqNoToKeep(long seqNo) {
308+
if (seqNo < this.minSeqNoToKeep) {
309+
throw new IllegalArgumentException(
310+
"min seq number required can't go backwards: " + "current [" + this.minSeqNoToKeep + "] new [" + seqNo + "]"
311+
);
312+
}
313+
this.minSeqNoToKeep = seqNo;
314+
}
315+
316+
@Override
317+
void deleteReaderFiles(TranslogReader reader) {
318+
try {
319+
translogTransferManager.deleteTranslog(primaryTermSupplier.getAsLong(), reader.generation);
320+
} catch (IOException ignored) {
321+
logger.error("Exception {} while deleting generation {}", ignored, reader.generation);
322+
}
323+
super.deleteReaderFiles(reader);
324+
}
285325
}

server/src/main/java/org/opensearch/index/translog/Translog.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -1684,7 +1684,7 @@ public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) {
16841684
}
16851685
}
16861686

1687-
private static long minGenerationForSeqNo(long seqNo, TranslogWriter writer, List<TranslogReader> readers) {
1687+
static long minGenerationForSeqNo(long seqNo, TranslogWriter writer, List<TranslogReader> readers) {
16881688
long minGen = writer.generation;
16891689
for (final TranslogReader reader : readers) {
16901690
if (seqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) {
@@ -1781,7 +1781,7 @@ public void trimUnreferencedReaders() throws IOException {
17811781
}
17821782
}
17831783

1784-
private long getMinReferencedGen() throws IOException {
1784+
protected long getMinReferencedGen() throws IOException {
17851785
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
17861786
long minReferencedGen = Math.min(
17871787
deletionPolicy.minTranslogGenRequired(readers, current),
@@ -1800,6 +1800,12 @@ private long getMinReferencedGen() throws IOException {
18001800
return minReferencedGen;
18011801
}
18021802

1803+
/*
1804+
Min Seq number required in translog to restore the complete data .
1805+
This might be required when segments are persisted via other mechanism than flush.
1806+
*/
1807+
protected void setMinSeqNoToKeep(long seqNo) {}
1808+
18031809
/**
18041810
* deletes all files associated with a reader. package-private to be able to simulate node failures at this point
18051811
*/

server/src/main/java/org/opensearch/index/translog/TranslogManager.java

+7
Original file line numberDiff line numberDiff line change
@@ -114,4 +114,11 @@ public interface TranslogManager {
114114
* Checks if the translog has a pending recovery
115115
*/
116116
void ensureCanFlush();
117+
118+
/**
119+
*
120+
* @param seqNo : operations greater or equal to seqNo should be persisted
121+
* This might be required when segments are persisted via other mechanism than flush.
122+
*/
123+
void setMinSeqNoToKeep(long seqNo);
117124
}

server/src/main/java/org/opensearch/index/translog/TranslogWriter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ synchronized boolean assertNoSeqAbove(long belowTerm, long aboveSeqNo) {
359359
*
360360
* Note: any exception during the sync process will be interpreted as a tragic exception and the writer will be closed before
361361
* raising the exception.
362-
* @return
362+
* @return <code>true</code> if this call caused an actual sync operation
363363
*/
364364
public boolean sync() throws IOException {
365365
return syncUpTo(Long.MAX_VALUE);

server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java

+6
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.IOException;
2121
import java.io.InputStream;
22+
import java.util.List;
2223
import java.util.Set;
2324
import java.util.concurrent.ExecutorService;
2425

@@ -75,6 +76,11 @@ public InputStream downloadBlob(Iterable<String> path, String fileName) throws I
7576
return blobStore.blobContainer((BlobPath) path).readBlob(fileName);
7677
}
7778

79+
@Override
80+
public void deleteBlobs(Iterable<String> path, List<String> fileNames) throws IOException {
81+
blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames);
82+
}
83+
7884
@Override
7985
public Set<String> listAll(Iterable<String> path) throws IOException {
8086
return blobStore.blobContainer((BlobPath) path).listBlobs().keySet();

server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java

+5
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) {
5555
add(fileSnapshot.getName(), TransferState.FAILED);
5656
}
5757

58+
@Override
59+
public void onDelete(String name) {
60+
fileTransferTracker.remove(name);
61+
}
62+
5863
public Set<TransferFileSnapshot> exclusionFilter(Set<TransferFileSnapshot> original) {
5964
return original.stream()
6065
.filter(fileSnapshot -> fileTransferTracker.get(fileSnapshot.getName()) != TransferState.SUCCESS)

server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import java.io.IOException;
1515
import java.io.InputStream;
16+
import java.util.List;
1617
import java.util.Set;
1718

1819
/**
@@ -42,6 +43,8 @@ void uploadBlobAsync(
4243
*/
4344
void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable<String> remotePath) throws IOException;
4445

46+
void deleteBlobs(Iterable<String> path, List<String> fileNames) throws IOException;
47+
4548
/**
4649
* Lists the files
4750
* @param path : the path to list
@@ -52,8 +55,8 @@ void uploadBlobAsync(
5255

5356
/**
5457
*
55-
* @param path
56-
* @param fileName
58+
* @param path the remote path from where download should be made
59+
* @param fileName the name of the file
5760
* @return inputstream of the remote file
5861
* @throws IOException the exception while reading the data
5962
*/

server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java

+11
Original file line numberDiff line numberDiff line change
@@ -194,4 +194,15 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot)
194194
translogTransferMetadata.getPrimaryTerm()
195195
);
196196
}
197+
198+
public void deleteTranslog(long primaryTerm, long generation) throws IOException {
199+
String ckpFileName = Translog.getCommitCheckpointFileName(generation);
200+
String translogFilename = Translog.getFilename(generation);
201+
// ToDo - Take care of metadata file cleanup
202+
// https://github.com/opensearch-project/OpenSearch/issues/5677
203+
fileTransferTracker.onDelete(ckpFileName);
204+
fileTransferTracker.onDelete(translogFilename);
205+
List<String> files = List.of(ckpFileName, translogFilename);
206+
transferService.deleteBlobs(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), files);
207+
}
197208
}

server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
1212

1313
/**
14-
* The listener to be invoked on the completion or failure of a {@link TransferFileSnapshot}
14+
* The listener to be invoked on the completion or failure of a {@link TransferFileSnapshot} or deletion of file
1515
*
1616
* @opensearch.internal
1717
*/
@@ -29,4 +29,6 @@ public interface FileTransferListener {
2929
* @param e the exception while processing the {@link TransferFileSnapshot}
3030
*/
3131
void onFailure(TransferFileSnapshot fileSnapshot, Exception e);
32+
33+
void onDelete(String name);
3234
}

server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java

+36-3
Original file line numberDiff line numberDiff line change
@@ -448,13 +448,13 @@ public void testSimpleOperationsUpload() throws IOException {
448448
assertThat(snapshot.totalOperations(), equalTo(ops.size()));
449449
}
450450

451-
assertEquals(translog.allUploaded().size(), 4);
451+
assertEquals(translog.allUploaded().size(), 2);
452452

453453
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 }));
454-
assertEquals(translog.allUploaded().size(), 6);
454+
assertEquals(translog.allUploaded().size(), 4);
455455

456456
translog.rollGeneration();
457-
assertEquals(translog.allUploaded().size(), 6);
457+
assertEquals(translog.allUploaded().size(), 4);
458458

459459
Set<String> mdFiles = blobStoreTransferService.listAll(
460460
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add("metadata")
@@ -495,6 +495,38 @@ public void testSimpleOperationsUpload() throws IOException {
495495
assertArrayEquals(ckp, content);
496496
}
497497
}
498+
499+
// expose the new checkpoint (simulating a commit), before we trim the translog
500+
translog.deletionPolicy.setLocalCheckpointOfSafeCommit(0);
501+
// simulating the remote segment upload .
502+
translog.setMinSeqNoToKeep(0);
503+
// This should not trim anything
504+
translog.trimUnreferencedReaders();
505+
assertEquals(translog.allUploaded().size(), 4);
506+
assertEquals(
507+
blobStoreTransferService.listAll(
508+
repository.basePath()
509+
.add(shardId.getIndex().getUUID())
510+
.add(String.valueOf(shardId.id()))
511+
.add(String.valueOf(primaryTerm.get()))
512+
).size(),
513+
4
514+
);
515+
516+
// This should trim tlog-2.* files as it contains seq no 0
517+
translog.setMinSeqNoToKeep(1);
518+
translog.trimUnreferencedReaders();
519+
assertEquals(translog.allUploaded().size(), 2);
520+
assertEquals(
521+
blobStoreTransferService.listAll(
522+
repository.basePath()
523+
.add(shardId.getIndex().getUUID())
524+
.add(String.valueOf(shardId.id()))
525+
.add(String.valueOf(primaryTerm.get()))
526+
).size(),
527+
2
528+
);
529+
498530
}
499531

500532
private Long populateTranslogOps(boolean withMissingOps) throws IOException {
@@ -684,6 +716,7 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep
684716
// expose the new checkpoint (simulating a commit), before we trim the translog
685717
lastCommittedLocalCheckpoint.set(localCheckpoint);
686718
deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
719+
translog.setMinSeqNoToKeep(localCheckpoint + 1);
687720
translog.trimUnreferencedReaders();
688721
}
689722
}

server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java

+3
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ public void onSuccess(TransferFileSnapshot fileSnapshot) {
8484
public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) {
8585
fileTransferFailed.incrementAndGet();
8686
}
87+
88+
@Override
89+
public void onDelete(String name) {}
8790
}
8891
);
8992

0 commit comments

Comments
 (0)