-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce global checkpoint background sync #26591
Introduce global checkpoint background sync #26591
Conversation
It is the exciting return of the global checkpoint background sync. Long, long ago, in snapshot version far, far away we had and only had a global checkpoint background sync. This sync would fire periodically and send the global checkpoint from the primary shard to the replicas so that they could update their local knowledge of the global checkpoint. Later in time, as we sped ahead towards finalizing the initial version of sequence IDs, we realized that we need the global checkpoint updates to be inline. This means that on a replication operation, the primary shard would piggy back the global checkpoint with the replication operation to the replicas. The replicas would update their local knowledge of the global checkpoint and reply with their local checkpoint. However, this could allow the global checkpoint on the primary to advance again and the replicas would fall behind in their local knowledge of the global checkpoint. If another replication operation never fired, then the replicas would be permanently behind. To account for this, we added one more sync that would fire when the primary shard fell idle. However, this has problems: - the shard idle timer defaults to five minutes, a long time to wait for the replicas to learn of the new global checkpoint - if a replica missed the sync, there was no follow-up sync to catch them up - there is an inherent race condition where the primary shard could fall idle mid-operation (after having sent the replication request to the replicas); in this case, there would never be a background sync after the operation completes - tying the global checkpoint sync to the idle timer was never natural To fix this, we add back a global checkpoint background sync that fires on a timer. This timer fires every thirty seconds, and is not configurable (for simplicity). This background sync is smarter in the sense that it only sends a sync if the global checkpoint on at least one replica is lagging that of the primary. This necessitates adding the primary shard tracking its knowledge of the local knowledge of the global checkpoint on the replicas. When the timer fires, we can compare the global checkpoint on the primary to its knowledge of the global checkpoint on the replicas and only send a sync if there is a shard behind. During replication operations it can be the case that the timer fires and sends a sync that would be covered by an in-flight operation. This is okay, the extra sync does not hurt and we do not need the complexity of optimizing away this duplicate sync.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I found some issues regarding recoveries and failing shards. I will have to give this another close look, just some initial thoughts to share.
* | ||
* @return the global checkpoints for all shards | ||
*/ | ||
synchronized ObjectLongMap<String> getGlobalCheckpoints(final String allocationId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can set the allocation id of the primary when creating the tracker in the constructor (we then also don't need to pass it anymore to the activatePrimaryMode method).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed by #26630.
assert primaryMode; | ||
assert handoffInProgress == false; | ||
final ObjectLongMap<String> copy = new ObjectLongHashMap<>(globalCheckpoints); | ||
copy.put(allocationId, globalCheckpoint); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with the above suggestion (allocation id of primary set during construction), we can just have a globalCheckpoints
map that replaces the current globalCheckpoint
variable. The global checkpoint of the primary is then tracked in the same map as the other shard copies. An alternative is to generalize "localCheckPoints" to just "checkPoints" and put the global checkpoint info into that same map entry used for the local checkpoint tracking. This would also allow that info to be transferred during primary relocation handoff (neat?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed by #26666.
@@ -585,6 +619,7 @@ public synchronized void completeRelocationHandoff() { | |||
lcps.localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; | |||
} | |||
}); | |||
globalCheckpoints.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about removing map entries when replicas fail etc. and are removed by the cluster state (and cleaned from the localCheckpoints map)?
Note that in the above scenario maybeSyncGlobalCheckpoint
will execute the sync again and again (also a good test case?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed by #26666.
assert globalCheckpoints.containsKey(allocationId); | ||
final long globalCheckpoint = globalCheckpoints.get(allocationId); | ||
final boolean syncNeeded = | ||
StreamSupport.stream(globalCheckpoints.values().spliterator(), false).anyMatch(v -> v.value < globalCheckpoint); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this check is very coarse. With the info available in GlobalCheckpointTracker, we could actually only care for global checkpoints of in-sync copies. In case of a recovering shard, we don't care about running the global checkpoint sync again and again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking some more about this, I think that the information for non in-sync shard copies is even incorrect at the moment.
In case where we replicated a document while the target shard was recovering, it might not have set the global checkpoint information on that shard (see IndexShard#updateGlobalCheckpointOnReplica). We can therefore not consider the global checkpoint on the recovering shard to be correctly updated. Note that we rely on the recovery finalization to "fix" the global checkpoint info on the target by explicitly calling updateGlobalCheckpointOnReplica during recovery finalization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed by #26666.
@@ -42,9 +42,22 @@ | |||
Setting.timeSetting("index.translog.retention.check_interval", new TimeValue(10, TimeUnit.MINUTES), | |||
new TimeValue(-1, TimeUnit.MILLISECONDS), Property.Dynamic, Property.IndexScope); | |||
|
|||
public static final Setting<TimeValue> GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING = | |||
Setting.timeSetting( | |||
"index.global_checkpoint_sync.interval", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about making this a String constant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This setting is removed, it's not needed in the test case anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this setting should be defined in the main source set (even if not registered), not here. Yeah, it follows a pattern here, but that pattern is horrible imo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I pushed 7e6d1bf.
* @param allocationId the allocation ID to update the global checkpoint for | ||
* @param globalCheckpoint the global checkpoint | ||
*/ | ||
public void updateGlobalCheckpointForShard(final String allocationId, final long globalCheckpoint) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this also be called from RecoverySourceHandler after calling recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint())
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed by #26666.
public void testGlobalCheckpointSync() throws Exception { | ||
internalCluster().startNode(); | ||
prepareCreate("test", Settings.builder().put(GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms")).get(); | ||
ensureGreen("test"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why ensure green? I think the test becomes more interesting when we index while the cluster might not be green yet (see my comments on recovery)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this.
* master: (67 commits) Restoring from snapshot should force generation of a new history uuid (elastic#26694) test: Use a single primary shard so that the exception can caught in the same way Move pre-6.0 node checkpoint to SequenceNumbers Invalid JSON request body caused endless loop (elastic#26680) added comment fix line length violation Moved the check to fetch phase. This basically means that we throw a better error message instead of an AOBE and not adding more restrictions. inner hits: Do not allow inner hits that use _source and have a non nested object field as parent Separate Painless Whitelist Loading from the Painless Definition (elastic#26540) convert more admin requests to writeable (elastic#26566) Handle release of 5.6.1 Allow `InputStreamStreamInput` array size validation where applicable (elastic#26692) Update global checkpoint with permit after recovery Filter pre-6.0 nodes for checkpoint invariants Skip bad request REST test on pre-6.0 Reenable BWC tests after disabling for backport Add global checkpoint tracking on the primary [Test] Fix reference/cat/allocation/line_8 test failure [Docs] improved description for fs.total.available_in_bytes (elastic#26657) Fix discovery-file plugin to use custom config path ...
* master: Remove assertion from checkpoint tracker invariants Upgrade API: fix excessive logging and unnecessary template updates (elastic#26698)
* master: [DOCS] Added index-shared4 and index-shared5.asciidoc BulkProcessor flush runnable preserves the thread context from creation time (elastic#26718) Catch exceptions and inform handler in RemoteClusterConnection#collectNodes (elastic#26725) [Docs] Fix name of character filter in example. (elastic#26724) Remove parse field deprecations in query builders (elastic#26711) elastic#26720: Set the correct bwc version after backport to 6.0 Remove deprecated type and slop field in MatchQueryBuilder (elastic#26720) Refactoring of Gateway*** classes (elastic#26706) Make RestHighLevelClient's Request class public (elastic#26627) Deguice ActionFilter (elastic#26691) aggs: Allow aggregation sorting via nested aggregation. Build: Set bwc builds to always set snapshot (elastic#26704) File Discovery: Remove fallback with zen discovery (elastic#26667)
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - yay for super-fast gcp syncs
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2)) | ||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2)) | ||
)); | ||
.setSettings(Settings.builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IDE gone rogue in this file? :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I concurrently pushed b640b10. 😄
@@ -330,7 +330,8 @@ public IndexService newIndexService( | |||
IndicesQueryCache indicesQueryCache, | |||
MapperRegistry mapperRegistry, | |||
IndicesFieldDataCache indicesFieldDataCache, | |||
NamedWriteableRegistry namedWriteableRegistry) | |||
NamedWriteableRegistry namedWriteableRegistry, | |||
Consumer<ShardId> globalCheckpointSyncer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this and remove all changes to IndicesService.createIndex etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 26e4c76.
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener, | ||
PrimaryShardReference primaryShardReference) { | ||
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener, | ||
PrimaryShardReference primaryShardReference) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IDE gone rogue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed b8adcce.
final Consumer<Client> afterIndexing) throws Exception { | ||
final int numberOfReplicas = randomIntBetween(1, 4); | ||
internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); | ||
// set the sync interval high so it does not execute during this test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment belongs to the calling method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed b80b728.
I pushed f3b04dc. |
* master: Add permission checks before reading from HDFS stream (elastic#26716) muted test [Docs] Fixed typo of *configuration* (elastic#25058) Add azure storage endpoint suffix elastic#26432 (elastic#26568)
It is the exciting return of the global checkpoint background sync. Long, long ago, in snapshot version far, far away we had and only had a global checkpoint background sync. This sync would fire periodically and send the global checkpoint from the primary shard to the replicas so that they could update their local knowledge of the global checkpoint. Later in time, as we sped ahead towards finalizing the initial version of sequence IDs, we realized that we need the global checkpoint updates to be inline. This means that on a replication operation, the primary shard would piggy back the global checkpoint with the replication operation to the replicas. The replicas would update their local knowledge of the global checkpoint and reply with their local checkpoint. However, this could allow the global checkpoint on the primary to advance again and the replicas would fall behind in their local knowledge of the global checkpoint. If another replication operation never fired, then the replicas would be permanently behind. To account for this, we added one more sync that would fire when the primary shard fell idle. However, this has problems: - the shard idle timer defaults to five minutes, a long time to wait for the replicas to learn of the new global checkpoint - if a replica missed the sync, there was no follow-up sync to catch them up - there is an inherent race condition where the primary shard could fall idle mid-operation (after having sent the replication request to the replicas); in this case, there would never be a background sync after the operation completes - tying the global checkpoint sync to the idle timer was never natural To fix this, we add two additional changes for the global checkpoint to be synced to the replicas. The first is that we add a post-operation sync that only fires if there are no operations in flight and there is a lagging replica. This gives us a chance to sync the global checkpoint to the replicas immediately after an operation so that they are always kept up to date. The second is that we add back a global checkpoint background sync that fires on a timer. This timer fires every thirty seconds, and is not configurable (for simplicity). This background sync is smarter than what we had previously in the sense that it only sends a sync if the global checkpoint on at least one replica is lagging that of the primary. When the timer fires, we can compare the global checkpoint on the primary to its knowledge of the global checkpoint on the replicas and only send a sync if there is a shard behind. Relates #26591
It is the exciting return of the global checkpoint background sync. Long, long ago, in snapshot version far, far away we had and only had a global checkpoint background sync. This sync would fire periodically and send the global checkpoint from the primary shard to the replicas so that they could update their local knowledge of the global checkpoint. Later in time, as we sped ahead towards finalizing the initial version of sequence IDs, we realized that we need the global checkpoint updates to be inline. This means that on a replication operation, the primary shard would piggy back the global checkpoint with the replication operation to the replicas. The replicas would update their local knowledge of the global checkpoint and reply with their local checkpoint. However, this could allow the global checkpoint on the primary to advance again and the replicas would fall behind in their local knowledge of the global checkpoint. If another replication operation never fired, then the replicas would be permanently behind. To account for this, we added one more sync that would fire when the primary shard fell idle. However, this has problems: - the shard idle timer defaults to five minutes, a long time to wait for the replicas to learn of the new global checkpoint - if a replica missed the sync, there was no follow-up sync to catch them up - there is an inherent race condition where the primary shard could fall idle mid-operation (after having sent the replication request to the replicas); in this case, there would never be a background sync after the operation completes - tying the global checkpoint sync to the idle timer was never natural To fix this, we add two additional changes for the global checkpoint to be synced to the replicas. The first is that we add a post-operation sync that only fires if there are no operations in flight and there is a lagging replica. This gives us a chance to sync the global checkpoint to the replicas immediately after an operation so that they are always kept up to date. The second is that we add back a global checkpoint background sync that fires on a timer. This timer fires every thirty seconds, and is not configurable (for simplicity). This background sync is smarter than what we had previously in the sense that it only sends a sync if the global checkpoint on at least one replica is lagging that of the primary. When the timer fires, we can compare the global checkpoint on the primary to its knowledge of the global checkpoint on the replicas and only send a sync if there is a shard behind. Relates #26591
This commit reenables the BWC tests after the introduction of the post-operation and background global checkpoint sync. Relates #26591
This commit reenables the BWC tests after the introduction of the post-operation and background global checkpoint sync. Relates #26591
This commit reenables the BWC tests after the introduction of the post-operation and background global checkpoint sync. Relates #26591
Thanks @ywelsch. |
It is the exciting return of the global checkpoint background sync. Long, long ago, in snapshot version far, far away we had and only had a global checkpoint background sync. This sync would fire periodically and send the global checkpoint from the primary shard to the replicas so that they could update their local knowledge of the global checkpoint. Later in time, as we sped ahead towards finalizing the initial version of sequence IDs, we realized that we need the global checkpoint updates to be inline. This means that on a replication operation, the primary shard would piggy back the global checkpoint with the replication operation to the replicas. The replicas would update their local knowledge of the global checkpoint and reply with their local checkpoint. However, this could allow the global checkpoint on the primary to advance again and the replicas would fall behind in their local knowledge of the global checkpoint. If another replication operation never fired, then the replicas would be permanently behind. To account for this, we added one more sync that would fire when the primary shard fell idle. However, this has problems:
To fix this, we add two additional changes for the global checkpoint to be synced to the replicas. The first is that we add a post-operation sync that only fires if there are no operations in flight and there is a lagging replica. This gives us a chance to sync the global checkpoint to the replicas immediately after an operation so that they are always kept up to date. The second is that we add back a global checkpoint background sync that fires on a timer. This timer fires every thirty seconds, and is not configurable (for simplicity). This background sync is smarter than what we had previously in the sense that it only sends a sync if the global checkpoint on at least one replica is lagging that of the primary. When the timer fires, we can compare the global checkpoint on the primary to its knowledge of the global checkpoint on the replicas and only send a sync if there is a shard behind.
Closes #26573, relates #26630, relates #26666