-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rollback a primary before recovering from translog #27804
Conversation
Today we always recover a primary from the last commit point. However with a new deletion policy, we keep multiple commit points in the existing store, thus we have chance to find a good starting commit point. With a good starting commit point, we may be able to throw away stale operations. This PR rollbacks a primary to a starting commit then recovering from translog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Nhat. I don't really like the fact that we duplicate the logic in the CombinedDeletionPolicy. Instead, I'd try to make static methods in that class which can be reused.
Second I don't think we should delete commits. Instead I was thinking to use the ability to open an IndexWriter on a specific commit. For this we'd use the same commit we identify in the deletion policy. Wdyt?
@bleskes I agree. I've updated the PR to let an engine open a starting commit point. Could you please take a look? Thank you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx Nhat. I left some questions.
throw new AssertionError("unknown recovery type: [" + recoveryType + "]"); | ||
} | ||
final IndexCommit startingCommit; | ||
if (recoveryType == RecoverySource.Type.EXISTING_STORE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this needs to be out of engine? can't we do it in the engine construct as an invariant when opening an index (and translog)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I will move it to the Engine.
// Snapshotted commits may not have all its required translog. | ||
final List<IndexCommit> recoverableCommits = new ArrayList<>(); | ||
for (IndexCommit commit : commits) { | ||
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we do this? isn't the logic in indexOfKeptCommits enough to deal with this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the previous 6.x versions, we keep the last commit and translog for that commit only. If we take a snapshot and commit, we will have two commits but translog for the last commit only. During the store recovery, if the max_seqno of the last commit is greater than the global checkpoint, the Policy will pick the snapshotted commit although it does not have full translog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I think we should keep this class clean and put this prefiltering in the engine, if we open an index created before 6.2. This way it will be clear when we can remove it.
} else { | ||
assert commits.contains(startingIndexCommit) : "Existing commits must contain the starting commit; " + | ||
"startingCommit [" + startingIndexCommit + "], commits [" + commits + "]"; | ||
commits.stream().filter(commit -> startingIndexCommit.equals(commit) == false).forEach(IndexCommit::delete); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need special handling here and need the start commit point? can you explain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I calculated the retained translog generations incorrectly; I will revert this change. There is an issue with the local checkpoint; I will reach out to discuss with you @bleskes.
@bleskes, I've moved the starting commit point to the InternalEngine and removed the special case in the CombinedDeletionPolicy. Would you please take another look? Thank you. |
# Conflicts: # core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java # core/src/main/java/org/elasticsearch/index/store/Store.java # core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
This reverts commit 4c4a1c7.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx @dnhatn . I left some more suggestions
* @param globalCheckpoint the persisted global checkpoint from the translog, see {@link Translog#readGlobalCheckpoint(Path)} | ||
* @param minRetainedTranslogGen the minimum translog generation is retained, see {@link Translog#readMinReferencedTranslogGen(Path)} | ||
*/ | ||
public static IndexCommit startingCommitPoint(List<IndexCommit> commits, long globalCheckpoint, long minRetainedTranslogGen) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering - should we call this findSafeCommit? I suspect that this will be use full for later too.
// Snapshotted commits may not have all its required translog. | ||
final List<IndexCommit> recoverableCommits = new ArrayList<>(); | ||
for (IndexCommit commit : commits) { | ||
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I think we should keep this class clean and put this prefiltering in the engine, if we open an index created before 6.2. This way it will be clear when we can remove it.
@@ -179,6 +182,9 @@ public InternalEngine(EngineConfig engineConfig) { | |||
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings()); | |||
throttle = new IndexThrottle(); | |||
try { | |||
this.startingCommit = getStartingCommitPoint(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to make it a field? can't it be a parameter to createWriter ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is used in two other places: #recoverFromTranslogInternal and #createLocalCheckpointTracker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be a parameter to createLocalCheckpointTracker and I left a comment on recoverFromTranslogInternal . The less state the better ;)
} | ||
return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i like the final approach better this gives a uniform return value (easier to understand) and it makes sure these things are set.
private void recoverFromTranslogInternal() throws IOException { | ||
Translog.TranslogGeneration translogGeneration = translog.getGeneration(); | ||
final int opsRecovered; | ||
final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); | ||
final long translogGen = Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the lastCommittedSegmentInfo should be set to the opening commit, no?
@bleskes, I have addressed your feedbacks. Would you please give it another go. Thank you. |
please test this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thx @dnhatn . I think we're very close. I left another bunch of minor comments
@@ -90,12 +91,26 @@ private void updateTranslogDeletionPolicy(final IndexCommit minRequiredCommit, f | |||
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen); | |||
} | |||
|
|||
/** | |||
* Find a safe commit point from a list of existing commits based on the persisted global checkpoint from translog. | |||
* The max seqno of a safe commit point should be at most the global checkpoint from the translog checkpoint. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a huge warning that says that if the index was created before 6.2 we can't guarantee we'll find one and that in that case we return the oldest valid commit?
@@ -90,12 +91,26 @@ private void updateTranslogDeletionPolicy(final IndexCommit minRequiredCommit, f | |||
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen); | |||
} | |||
|
|||
/** | |||
* Find a safe commit point from a list of existing commits based on the persisted global checkpoint from translog. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
persisted global checkpoint from the translog -> supplied global checkpoint.
@@ -177,14 +180,17 @@ public InternalEngine(EngineConfig engineConfig) { | |||
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings()); | |||
throttle = new IndexThrottle(); | |||
try { | |||
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); | |||
final IndexCommit startingCommit = getStartingCommitPoint(); | |||
assert startingCommit == null || openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this boolean does what you want? if start commit is null it will always pass.
@@ -243,8 +249,14 @@ private LocalCheckpointTracker createLocalCheckpointTracker( | |||
localCheckpoint = SequenceNumbers.NO_OPS_PERFORMED; | |||
break; | |||
case OPEN_INDEX_AND_TRANSLOG: | |||
// When recovering from a previous commit point, we use the local checkpoint from that commit, | |||
// but the max_seqno from the last commit. This allows use to throw away stale operations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this distinction? can't we keep it conceptually simple? also we recover everything from the translog, so I'm not sure how it can happen that we throw away stuff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
If we use the local checkpoint from the last commit, operations having seqno less than or equal to the local checkpoint will be skipped -> we need to use the local checkpoint from the starting commit.
-
We fillSeqNoGaps until the max_seqno -> we need to use max_seqno from the last commit to have full history.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We fillSeqNoGaps until the max_seqno -> we need to use max_seqno from the last commit to have full history.
We do so after we replay the translog. at which point we'll know what the max seq is (and it may be higher than the one in the last commit, but it can't be lower)
recoverableCommits.add(commit); | ||
} | ||
} | ||
assert recoverableCommits.isEmpty() == false : "Unable to select a proper safe commit point; " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you want to say that no commit point was found which could be recovered from the translog.
// To avoid this issue, we only select index commits whose translog files are fully retained. | ||
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) { | ||
final List<IndexCommit> recoverableCommits = new ArrayList<>(); | ||
final long minRetainedTranslogGen = Translog.readMinReferencedTranslogGen(translogPath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we open the translog before the writer now - can we use the open translog for all this information rather than static reading it? this will make sure that we use something that went through all the right validations.
@@ -521,7 +559,11 @@ private ExternalSearcherManager createSearcherManager(SearchFactory externalSear | |||
final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId); | |||
internalSearcherManager = new SearcherManager(directoryReader, | |||
new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService())); | |||
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(internalSearcherManager, store); | |||
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain why we need this destinction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we open other commit rather the last commit, we should assign lastCommittedSegmentInfos from that commit. I can revert this change and add this logic to recoverFromTranslogInternal
. Your thought?
@@ -1121,8 +1120,12 @@ public void testRenewSyncFlush() throws Exception { | |||
} | |||
|
|||
public void testSyncedFlushSurvivesEngineRestart() throws IOException { | |||
final LongSupplier inSyncGlobalCheckpointSupplier = () -> this.engine.getLocalCheckpointTracker().getCheckpoint(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this needed?
@@ -1092,6 +1086,7 @@ public void testAcquireIndexCommit() throws Exception { | |||
public void testSnapshotStore() throws IOException { | |||
final IndexShard shard = newStartedShard(true); | |||
indexDoc(shard, "test", "0"); | |||
shard.updateLocalCheckpointForShard(shard.shardRouting.allocationId().getId(), 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I presume you did this to make sure the global checkpoint advances. I wonder if we should fold into indexDoc? it's sneaky
flushShard(shard); | ||
assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1")); | ||
// Simulate resync (without rollback): Noop #1, index #2 | ||
shard.markSeqNoAsNoop(1, "test"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you need a primary term of 2 here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I left some minor suggestions. No need for another review cycle. I think the stats (+191, -75) show we got this to the essence. Thanks for all the iterations @dnhatn
@@ -521,7 +550,7 @@ private ExternalSearcherManager createSearcherManager(SearchFactory externalSear | |||
final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId); | |||
internalSearcherManager = new SearcherManager(directoryReader, | |||
new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService())); | |||
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(internalSearcherManager, store); | |||
lastCommittedSegmentInfos = store.readCommittedSegmentsInfo(indexWriter.getConfig().getIndexCommit()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment that indexWriter.getConfig().getIndexCommit()
can be null?
try { | ||
final IndexWriterConfig iwc = getIndexWriterConfig(create); | ||
assert startingCommit == null || create == false : "Starting commit makes sense only when create=false"; | ||
iwc.setIndexCommit(startingCommit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be part of getIndexWriterConfig?
@@ -2514,6 +2518,7 @@ private Mapping dynamicUpdate() { | |||
} | |||
|
|||
public void testTranslogReplay() throws IOException { | |||
final LongSupplier inSyncGlobalCheckpointSupplier = () -> this.engine.getLocalCheckpointTracker().getCheckpoint(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we should make this the default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the time, we prefer to deal with an in-sync global checkpoint, thus making this as a default makes sense to me. I will do in a follow-up.
@bleskes, Thanks a lot for your review. |
Today we always recover a primary from the last commit point. However with a new deletion policy, we keep multiple commit points in the existing store, thus we have chance to find a good starting commit point. With a good starting commit point, we may be able to throw away stale operations. This PR rollbacks a primary to a starting commit then recovering from translog. Relates #10708
Keeping unsafe commits when opening an engine can be problematic because these commits are not safe at the recovering time but they can suddenly become safe in the future. The following issues can happen if unsafe commits are kept oninit. 1. Replica can use unsafe commit in peer-recovery. This happens when a replica with a safe commit c1 (max_seqno=1) and an unsafe commit c2 (max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new document (seqno=2) is added without flushing, the global checkpoint is advanced to 2; and the replica recovers again, it will use the unsafe commit c2 (max_seqno=2 <= gcp=2) as the starting commit for sequenced based recovery even the commit c2 contains a stale operation and the document (with seqno=2) will not be replicated to the replica. 2. Min translog gen for recovery can go backwards in peer-recovery. This happens when a replica with a safe commit c1 (local_checkpoint=1, recovery_translog_gen=1) and an unsafe commit c2 (local_checkpoint=2, recovery_translog_gen=2). The replica recovers from a primary, and keeps c2 as the last commit, then sets last_translog_gen to 2. Flushing a new commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1 while the local checkpoint of c2 is 2. 3. Commit without translog can be used for recovery. An old index, which was created before multiple-commits is introduced (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit, the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog. These issues can be avoided if the combined deletion policy keeps only the starting commit onInit. Relates #27804 Relates #28181
Keeping unsafe commits when opening an engine can be problematic because these commits are not safe at the recovering time but they can suddenly become safe in the future. The following issues can happen if unsafe commits are kept oninit. 1. Replica can use unsafe commit in peer-recovery. This happens when a replica with a safe commit c1 (max_seqno=1) and an unsafe commit c2 (max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new document (seqno=2) is added without flushing, the global checkpoint is advanced to 2; and the replica recovers again, it will use the unsafe commit c2 (max_seqno=2 <= gcp=2) as the starting commit for sequenced based recovery even the commit c2 contains a stale operation and the document (with seqno=2) will not be replicated to the replica. 2. Min translog gen for recovery can go backwards in peer-recovery. This happens when a replica with a safe commit c1 (local_checkpoint=1, recovery_translog_gen=1) and an unsafe commit c2 (local_checkpoint=2, recovery_translog_gen=2). The replica recovers from a primary, and keeps c2 as the last commit, then sets last_translog_gen to 2. Flushing a new commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1 while the local checkpoint of c2 is 2. 3. Commit without translog can be used for recovery. An old index, which was created before multiple-commits is introduced (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit, the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog. These issues can be avoided if the combined deletion policy keeps only the starting commit onInit. Relates #27804 Relates #28181
Today we always recover a primary from the last commit point. However with a new deletion policy, we keep multiple commit points in the existing store, thus we have chance to find a good starting commit point. With a good starting commit point, we may be able to throw away stale operations. This PR rollbacks a primary to a starting commit then recovering from translog.
Relates #10708