Skip to content

Commit

Permalink
Use lastSyncedGlobalCheckpoint in deletion policy (#27826)
Browse files Browse the repository at this point in the history
Today we use the in-memory global checkpoint from SequenceNumbersService
to clean up unneeded commit points, however the latest global checkpoint
may haven't fsynced to the disk yet. If the translog checkpoint fsync
failed and we already use a higher global checkpoint to clean up commit
points, then we may have removed a safe commit which we try to keep for
recovery.

This commit updates the deletion policy using lastSyncedGlobalCheckpoint
from Translog rather the in memory global checkpoint.

Relates #27606
  • Loading branch information
dnhatn authored Dec 16, 2017
1 parent 43ff38c commit 4f62b51
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,11 @@ public InternalEngine(EngineConfig engineConfig) {
final SeqNoStats seqNoStats = loadSeqNoStats(openMode);
logger.trace("recovered [{}]", seqNoStats);
this.seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats);
translog = openTranslog(engineConfig, translogDeletionPolicy, seqNoService::getGlobalCheckpoint);
assert translog.getGeneration() != null;
this.translog = translog;
this.snapshotDeletionPolicy = new SnapshotDeletionPolicy(
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, seqNoService::getGlobalCheckpoint)
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint)
);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
Expand All @@ -195,9 +198,6 @@ public InternalEngine(EngineConfig engineConfig) {
historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID());
Objects.requireNonNull(historyUUID, "history uuid should not be null");
indexWriter = writer;
translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService.getGlobalCheckpoint());
assert translog.getGeneration() != null;
this.translog = translog;
updateWriterOnOpen();
} catch (IOException | TranslogCorruptedException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
Expand Down Expand Up @@ -437,12 +437,12 @@ private void recoverFromTranslogInternal() throws IOException {
translog.trimUnreferencedReaders();
}

private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) throws IOException {
private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) throws IOException {
assert openMode != null;
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
String translogUUID = null;
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
translogUUID = loadTranslogUUIDFromCommit(writer);
translogUUID = loadTranslogUUIDFromLastCommit();
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
if (translogUUID == null) {
throw new IndexFormatTooOldException("translog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
Expand Down Expand Up @@ -492,14 +492,13 @@ public long getWritingBytes() {
}

/**
* Reads the current stored translog ID from the IW commit data. If the id is not found, recommits the current
* translog id into lucene and returns null.
* Reads the current stored translog ID from the last commit data.
*/
@Nullable
private String loadTranslogUUIDFromCommit(IndexWriter writer) throws IOException {
// commit on a just opened writer will commit even if there are no changes done to it
// we rely on that for the commit data translog id key
final Map<String, String> commitUserData = commitDataAsMap(writer);
private String loadTranslogUUIDFromLastCommit() throws IOException {
assert openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG :
"Only reuse existing translogUUID with OPEN_INDEX_AND_TRANSLOG; openMode = [" + openMode + "]";
final Map<String, String> commitUserData = store.readLastCommittedSegmentsInfo().getUserData();
if (commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY)) {
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
throw new IllegalStateException("commit doesn't contain translog generation id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -4263,29 +4262,40 @@ public long getGlobalCheckpoint() {
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), randomFrom("-1", "512b", "1gb")));
indexSettings.updateIndexMetaData(builder.build());

final Path translogPath = createTempDir();
store = createStore();
try (InternalEngine engine
= createEngine(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, seqNoServiceSupplier)) {
final EngineConfig engineConfig = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null);
try (Engine engine = new InternalEngine(engineConfig, seqNoServiceSupplier) {
@Override
protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
// Advance the global checkpoint during the flush to create a lag between a persisted global checkpoint in the translog
// (this value is visible to the deletion policy) and an in memory global checkpoint in the SequenceNumbersService.
if (rarely()) {
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), seqNoService().getLocalCheckpoint()));
}
super.commitIndexWriter(writer, translog, syncId);
}
}){
int numDocs = scaledRandomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {
ParseContext.Document document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
engine.index(indexForDoc(testParsedDocument(Integer.toString(docId), null, document, B_1, null)));
if (frequently()) {
globalCheckpoint.set(randomIntBetween(
Math.toIntExact(engine.seqNoService().getGlobalCheckpoint()),
Math.toIntExact(engine.seqNoService().getLocalCheckpoint())));
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.seqNoService().getLocalCheckpoint()));
engine.getTranslog().sync();
}
if (frequently()) {
final long lastSyncedGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath);
engine.flush(randomBoolean(), true);
final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
// Keep only one safe commit as the oldest commit.
final IndexCommit safeCommit = commits.get(0);
assertThat(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
lessThanOrEqualTo(globalCheckpoint.get()));
lessThanOrEqualTo(lastSyncedGlobalCheckpoint));
for (int i = 1; i < commits.size(); i++) {
assertThat(Long.parseLong(commits.get(i).getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
greaterThan(globalCheckpoint.get()));
greaterThan(lastSyncedGlobalCheckpoint));
}
// Make sure we keep all translog operations after the local checkpoint of the safe commit.
long localCheckpointFromSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
Expand Down

0 comments on commit 4f62b51

Please sign in to comment.