Skip to content

Commit bae9aab

Browse files
committed
Add logging to index commit deletion policy (#28448)
This would help us to figure out which index commit that an engine started with or used in peer-recovery. Relates #28405
1 parent 19e9eb3 commit bae9aab

File tree

6 files changed

+66
-21
lines changed

6 files changed

+66
-21
lines changed

server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.index.engine;
2121

2222
import com.carrotsearch.hppc.ObjectIntHashMap;
23+
import org.apache.logging.log4j.Logger;
2324
import org.apache.lucene.index.IndexCommit;
2425
import org.apache.lucene.index.IndexDeletionPolicy;
2526
import org.apache.lucene.store.Directory;
@@ -31,6 +32,7 @@
3132
import java.nio.file.Path;
3233
import java.util.Collection;
3334
import java.util.List;
35+
import java.util.Locale;
3436
import java.util.Map;
3537
import java.util.function.LongSupplier;
3638

@@ -42,6 +44,7 @@
4244
* the current global checkpoint except the index commit which has the highest max sequence number among those.
4345
*/
4446
public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
47+
private final Logger logger;
4548
private final TranslogDeletionPolicy translogDeletionPolicy;
4649
private final EngineConfig.OpenMode openMode;
4750
private final LongSupplier globalCheckpointSupplier;
@@ -50,9 +53,10 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
5053
private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
5154
private volatile IndexCommit lastCommit; // the most recent commit point
5255

53-
CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy,
56+
CombinedDeletionPolicy(EngineConfig.OpenMode openMode, Logger logger, TranslogDeletionPolicy translogDeletionPolicy,
5457
LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) {
5558
this.openMode = openMode;
59+
this.logger = logger;
5660
this.translogDeletionPolicy = translogDeletionPolicy;
5761
this.globalCheckpointSupplier = globalCheckpointSupplier;
5862
this.startingCommit = startingCommit;
@@ -104,8 +108,12 @@ public synchronized void onInit(List<? extends IndexCommit> commits) throws IOEx
104108
* (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit,
105109
* the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog.
106110
*/
107-
private void keepOnlyStartingCommitOnInit(List<? extends IndexCommit> commits) {
108-
commits.stream().filter(commit -> startingCommit.equals(commit) == false).forEach(IndexCommit::delete);
111+
private void keepOnlyStartingCommitOnInit(List<? extends IndexCommit> commits) throws IOException {
112+
for (IndexCommit commit : commits) {
113+
if (startingCommit.equals(commit) == false) {
114+
this.deleteCommit(commit);
115+
}
116+
}
109117
assert startingCommit.isDeleted() == false : "Starting commit must not be deleted";
110118
lastCommit = startingCommit;
111119
safeCommit = startingCommit;
@@ -118,14 +126,22 @@ public synchronized void onCommit(List<? extends IndexCommit> commits) throws IO
118126
safeCommit = commits.get(keptPosition);
119127
for (int i = 0; i < keptPosition; i++) {
120128
if (snapshottedCommits.containsKey(commits.get(i)) == false) {
121-
commits.get(i).delete();
129+
deleteCommit(commits.get(i));
122130
}
123131
}
124132
updateTranslogDeletionPolicy();
125133
}
126134

135+
private void deleteCommit(IndexCommit commit) throws IOException {
136+
assert commit.isDeleted() == false : "Index commit [" + commitDescription(commit) + "] is deleted twice";
137+
logger.debug("Delete index commit [{}]", commitDescription(commit));
138+
commit.delete();
139+
assert commit.isDeleted() : "Deletion commit [" + commitDescription(commit) + "] was suppressed";
140+
}
141+
127142
private void updateTranslogDeletionPolicy() throws IOException {
128143
assert Thread.holdsLock(this);
144+
logger.debug("Safe commit [{}], last commit [{}]", commitDescription(safeCommit), commitDescription(lastCommit));
129145
assert safeCommit.isDeleted() == false : "The safe commit must not be deleted";
130146
final long minRequiredGen = Long.parseLong(safeCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
131147
assert lastCommit.isDeleted() == false : "The last commit must not be deleted";
@@ -239,6 +255,13 @@ boolean hasUnreferencedCommits() throws IOException {
239255
return false;
240256
}
241257

258+
/**
259+
* Returns a description for a given {@link IndexCommit}. This should be only used for logging and debugging.
260+
*/
261+
public static String commitDescription(IndexCommit commit) throws IOException {
262+
return String.format(Locale.ROOT, "CommitPoint{segment[%s], userData[%s]}", commit.getSegmentsFileName(), commit.getUserData());
263+
}
264+
242265
/**
243266
* A wrapper of an index commit that prevents it from being deleted.
244267
*/

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ public InternalEngine(EngineConfig engineConfig) {
185185
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier(), startingCommit);
186186
assert translog.getGeneration() != null;
187187
this.translog = translog;
188-
this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, translogDeletionPolicy,
188+
this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, logger, translogDeletionPolicy,
189189
translog::getLastSyncedGlobalCheckpoint, startingCommit);
190190
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit);
191191
updateMaxUnsafeAutoIdTimestampFromWriter(writer);

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.indices.recovery;
2121

22+
import org.apache.logging.log4j.Logger;
2223
import org.apache.logging.log4j.message.ParameterizedMessage;
2324
import org.apache.logging.log4j.util.Supplier;
2425
import org.apache.lucene.index.DirectoryReader;
@@ -64,6 +65,7 @@
6465

6566
import java.io.IOException;
6667
import java.util.List;
68+
import java.util.StringJoiner;
6769
import java.util.concurrent.atomic.AtomicLong;
6870
import java.util.concurrent.atomic.AtomicReference;
6971

@@ -320,7 +322,7 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
320322

321323
final long startingSeqNo;
322324
if (metadataSnapshot.size() > 0) {
323-
startingSeqNo = getStartingSeqNo(recoveryTarget);
325+
startingSeqNo = getStartingSeqNo(logger, recoveryTarget);
324326
} else {
325327
startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
326328
}
@@ -354,12 +356,20 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
354356
* @return the starting sequence number or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if obtaining the starting sequence number
355357
* failed
356358
*/
357-
public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) {
359+
public static long getStartingSeqNo(final Logger logger, final RecoveryTarget recoveryTarget) {
358360
try {
359361
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation());
360362
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(recoveryTarget.store().directory());
361363
final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint);
362364
final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo(safeCommit);
365+
if (logger.isTraceEnabled()) {
366+
final StringJoiner descriptionOfExistingCommits = new StringJoiner(",");
367+
for (IndexCommit commit : existingCommits) {
368+
descriptionOfExistingCommits.add(CombinedDeletionPolicy.commitDescription(commit));
369+
}
370+
logger.trace("Calculate starting seqno based on global checkpoint [{}], safe commit [{}], existing commits [{}]",
371+
globalCheckpoint, CombinedDeletionPolicy.commitDescription(safeCommit), descriptionOfExistingCommits);
372+
}
363373
if (seqNoStats.maxSeqNo <= globalCheckpoint) {
364374
assert seqNoStats.localCheckpoint <= globalCheckpoint;
365375
/*

server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void testKeepCommitsAfterGlobalCheckpoint() throws Exception {
5656
final AtomicLong globalCheckpoint = new AtomicLong();
5757
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
5858
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
59-
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
59+
OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null);
6060

6161
final LongArrayList maxSeqNoList = new LongArrayList();
6262
final LongArrayList translogGenList = new LongArrayList();
@@ -96,7 +96,7 @@ public void testAcquireIndexCommit() throws Exception {
9696
final UUID translogUUID = UUID.randomUUID();
9797
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
9898
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
99-
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
99+
OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null);
100100
long lastMaxSeqNo = between(1, 1000);
101101
long lastTranslogGen = between(1, 20);
102102
int safeIndex = 0;
@@ -117,6 +117,7 @@ public void testAcquireIndexCommit() throws Exception {
117117
long upper = safeIndex == commitList.size() - 1 ? lastMaxSeqNo :
118118
Long.parseLong(commitList.get(safeIndex + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO)) - 1;
119119
globalCheckpoint.set(randomLongBetween(lower, upper));
120+
commitList.forEach(this::resetDeletion);
120121
indexPolicy.onCommit(commitList);
121122
// Captures and releases some commits
122123
int captures = between(0, 5);
@@ -145,6 +146,7 @@ public void testAcquireIndexCommit() throws Exception {
145146
}
146147
snapshottingCommits.forEach(indexPolicy::releaseCommit);
147148
globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE));
149+
commitList.forEach(this::resetDeletion);
148150
indexPolicy.onCommit(commitList);
149151
for (int i = 0; i < commitList.size() - 1; i++) {
150152
assertThat(commitList.get(i).isDeleted(), equalTo(true));
@@ -160,7 +162,7 @@ public void testLegacyIndex() throws Exception {
160162

161163
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
162164
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
163-
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
165+
OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null);
164166

165167
long legacyTranslogGen = randomNonNegativeLong();
166168
IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen);
@@ -181,6 +183,7 @@ public void testLegacyIndex() throws Exception {
181183
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen));
182184

183185
// Make the fresh commit safe.
186+
resetDeletion(legacyCommit);
184187
globalCheckpoint.set(randomLongBetween(maxSeqNo, Long.MAX_VALUE));
185188
indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit));
186189
verify(legacyCommit, times(2)).delete();
@@ -194,7 +197,7 @@ public void testKeepSingleNoOpsCommits() throws Exception {
194197
final UUID translogUUID = UUID.randomUUID();
195198
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
196199
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
197-
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
200+
OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null);
198201

199202
final List<IndexCommit> commitList = new ArrayList<>();
200203
final int numOfNoOpsCommits = between(1, 10);
@@ -222,6 +225,7 @@ public void testKeepSingleNoOpsCommits() throws Exception {
222225
}
223226
// If the global checkpoint is still unassigned, we should still keep one NO_OPS_PERFORMED commit.
224227
globalCheckpoint.set(SequenceNumbers.UNASSIGNED_SEQ_NO);
228+
commitList.forEach(this::resetDeletion);
225229
indexPolicy.onCommit(commitList);
226230
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastNoopTranslogGen));
227231
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
@@ -232,6 +236,7 @@ public void testKeepSingleNoOpsCommits() throws Exception {
232236
// Delete no-ops commit if global checkpoint advanced enough.
233237
final long lower = Long.parseLong(commitList.get(numOfNoOpsCommits).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
234238
globalCheckpoint.set(randomLongBetween(lower, Long.MAX_VALUE));
239+
commitList.forEach(this::resetDeletion);
235240
indexPolicy.onCommit(commitList);
236241
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), greaterThan(lastNoopTranslogGen));
237242
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
@@ -242,7 +247,7 @@ public void testDeleteInvalidCommits() throws Exception {
242247
final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
243248
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
244249
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
245-
OPEN_INDEX_CREATE_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
250+
OPEN_INDEX_CREATE_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null);
246251

247252
final int invalidCommits = between(1, 10);
248253
final List<IndexCommit> commitList = new ArrayList<>();
@@ -280,7 +285,7 @@ public void testKeepOnlyStartingCommitOnInit() throws Exception {
280285
}
281286
final IndexCommit startingCommit = randomFrom(commitList);
282287
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
283-
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, startingCommit);
288+
OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, startingCommit);
284289
indexPolicy.onInit(commitList);
285290
for (IndexCommit commit : commitList) {
286291
if (commit.equals(startingCommit) == false) {
@@ -299,7 +304,7 @@ public void testCheckUnreferencedCommits() throws Exception {
299304
final UUID translogUUID = UUID.randomUUID();
300305
final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
301306
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
302-
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
307+
OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null);
303308
final List<IndexCommit> commitList = new ArrayList<>();
304309
int totalCommits = between(2, 20);
305310
long lastMaxSeqNo = between(1, 1000);
@@ -311,6 +316,7 @@ public void testCheckUnreferencedCommits() throws Exception {
311316
}
312317
IndexCommit safeCommit = randomFrom(commitList);
313318
globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)));
319+
commitList.forEach(this::resetDeletion);
314320
indexPolicy.onCommit(commitList);
315321
if (safeCommit == commitList.get(commitList.size() - 1)) {
316322
// Safe commit is the last commit - no need to clean up
@@ -324,6 +330,7 @@ public void testCheckUnreferencedCommits() throws Exception {
324330
// Advanced enough
325331
globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE));
326332
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(true));
333+
commitList.forEach(this::resetDeletion);
327334
indexPolicy.onCommit(commitList);
328335
// Safe commit is the last commit - no need to clean up
329336
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen));
@@ -337,17 +344,21 @@ IndexCommit mockIndexCommit(long maxSeqNo, UUID translogUUID, long translogGen)
337344
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
338345
userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString());
339346
userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen));
340-
final AtomicBoolean deleted = new AtomicBoolean();
341347
final IndexCommit commit = mock(IndexCommit.class);
342348
final Directory directory = mock(Directory.class);
343349
when(commit.getUserData()).thenReturn(userData);
344350
when(commit.getDirectory()).thenReturn(directory);
351+
resetDeletion(commit);
352+
return commit;
353+
}
354+
355+
void resetDeletion(IndexCommit commit) {
356+
final AtomicBoolean deleted = new AtomicBoolean();
345357
when(commit.isDeleted()).thenAnswer(args -> deleted.get());
346358
doAnswer(arg -> {
347359
deleted.set(true);
348360
return null;
349361
}).when(commit).delete();
350-
return commit;
351362
}
352363

353364
IndexCommit mockLegacyIndexCommit(UUID translogUUID, long translogGen) throws IOException {
@@ -356,6 +367,7 @@ IndexCommit mockLegacyIndexCommit(UUID translogUUID, long translogGen) throws IO
356367
userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen));
357368
final IndexCommit commit = mock(IndexCommit.class);
358369
when(commit.getUserData()).thenReturn(userData);
370+
resetDeletion(commit);
359371
return commit;
360372
}
361373
}

server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public void testGetStartingSeqNo() throws Exception {
3333
{
3434
recoveryEmptyReplica(replica);
3535
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
36-
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(0L));
36+
assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(0L));
3737
recoveryTarget.decRef();
3838
}
3939
// Last commit is good - use it.
@@ -49,7 +49,7 @@ public void testGetStartingSeqNo() throws Exception {
4949
replica.updateGlobalCheckpointOnReplica(initDocs - 1, "test");
5050
replica.getTranslog().sync();
5151
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
52-
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(initDocs));
52+
assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs));
5353
recoveryTarget.decRef();
5454
}
5555
// Global checkpoint does not advance, last commit is not good - use the previous commit
@@ -63,15 +63,15 @@ public void testGetStartingSeqNo() throws Exception {
6363
}
6464
flushShard(replica);
6565
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
66-
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(initDocs));
66+
assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs));
6767
recoveryTarget.decRef();
6868
}
6969
// Advances the global checkpoint, a safe commit also advances
7070
{
7171
replica.updateGlobalCheckpointOnReplica(initDocs + moreDocs - 1, "test");
7272
replica.getTranslog().sync();
7373
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
74-
assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(initDocs + moreDocs));
74+
assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs + moreDocs));
7575
recoveryTarget.decRef();
7676
}
7777
} finally {

test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ protected final void recoverReplica(final IndexShard replica,
463463
final Store.MetadataSnapshot snapshot = getMetadataSnapshotOrEmpty(replica);
464464
final long startingSeqNo;
465465
if (snapshot.size() > 0) {
466-
startingSeqNo = PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget);
466+
startingSeqNo = PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget);
467467
} else {
468468
startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
469469
}

0 commit comments

Comments
 (0)