Skip to content

Commit 031415a

Browse files
authored
Replicate writes only to fully initialized shards (#28049)
The primary currently replicates writes to all other shard copies as soon as they're added to the routing table. Initially those shards are not even ready yet to receive these replication requests, for example when undergoing a file-based peer recovery. Based on the specific stage that the shard copies are in, they will throw different kinds of exceptions when they receive the replication requests. The primary then ignores responses from shards that match certain exception types. With this mechanism it's not possible for a primary to distinguish between a situation where a replication target shard is not allocated and ready yet to receive requests and a situation where the shard was successfully allocated and active but subsequently failed. This commit changes replication so that only initializing shards that have successfully opened their engine are used as replication targets. This removes the need to replicate requests to initializing shards that are not even ready yet to receive those requests. This saves on network bandwidth and enables features that rely on the distinction between a "not-yet-ready" shard and a failed shard.
1 parent bc10334 commit 031415a

File tree

20 files changed

+366
-193
lines changed

20 files changed

+366
-193
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -117,19 +117,17 @@ public void execute() throws Exception {
117117
// This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
118118
final long globalCheckpoint = primary.globalCheckpoint();
119119
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
120-
markUnavailableShardsAsStale(replicaRequest, replicationGroup.getInSyncAllocationIds(), replicationGroup.getRoutingTable());
121-
performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());
120+
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
121+
performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup);
122122
}
123123

124124
successfulShards.incrementAndGet(); // mark primary as successful
125125
decPendingAndFinishIfNeeded();
126126
}
127127

128-
private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Set<String> inSyncAllocationIds,
129-
IndexShardRoutingTable indexShardRoutingTable) {
128+
private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, ReplicationGroup replicationGroup) {
130129
// if inSyncAllocationIds contains allocation ids of shards that don't exist in RoutingTable, mark copies as stale
131-
for (String allocationId : Sets.difference(inSyncAllocationIds, indexShardRoutingTable.getAllAllocationIds())) {
132-
// mark copy as stale
130+
for (String allocationId : replicationGroup.getUnavailableInSyncShards()) {
133131
pendingActions.incrementAndGet();
134132
replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId,
135133
ReplicationOperation.this::decPendingAndFinishIfNeeded,
@@ -140,22 +138,16 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Set<Str
140138
}
141139

142140
private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint,
143-
final IndexShardRoutingTable indexShardRoutingTable) {
144-
final String localNodeId = primary.routingEntry().currentNodeId();
145-
// If the index gets deleted after primary operation, we skip replication
146-
for (final ShardRouting shard : indexShardRoutingTable) {
147-
if (shard.unassigned()) {
148-
assert shard.primary() == false : "primary shard should not be unassigned in a replication group: " + shard;
149-
totalShards.incrementAndGet();
150-
continue;
151-
}
141+
final ReplicationGroup replicationGroup) {
142+
// for total stats, add number of unassigned shards and
143+
// number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target)
144+
totalShards.addAndGet(replicationGroup.getSkippedShards().size());
152145

153-
if (shard.currentNodeId().equals(localNodeId) == false) {
154-
performOnReplica(shard, replicaRequest, globalCheckpoint);
155-
}
146+
final ShardRouting primaryRouting = primary.routingEntry();
156147

157-
if (shard.relocating() && shard.relocatingNodeId().equals(localNodeId) == false) {
158-
performOnReplica(shard.getTargetRelocatingShard(), replicaRequest, globalCheckpoint);
148+
for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
149+
if (shard.isSameAllocation(primaryRouting) == false) {
150+
performOnReplica(shard, replicaRequest, globalCheckpoint);
159151
}
160152
}
161153
}

server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java renamed to server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.carrotsearch.hppc.ObjectLongHashMap;
2323
import com.carrotsearch.hppc.ObjectLongMap;
24+
import org.elasticsearch.Version;
2425
import org.elasticsearch.cluster.routing.AllocationId;
2526
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
2627
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -48,15 +49,17 @@
4849
import java.util.stream.LongStream;
4950

5051
/**
51-
* This class is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which all lower (or
52-
* equal) sequence number have been processed on all shards that are currently active. Since shards count as "active" when the master starts
52+
* This class is responsible for tracking the replication group with its progress and safety markers (local and global checkpoints).
53+
*
54+
* The global checkpoint is the highest sequence number for which all lower (or equal) sequence number have been processed
55+
* on all shards that are currently active. Since shards count as "active" when the master starts
5356
* them, and before this primary shard has been notified of this fact, we also include shards that have completed recovery. These shards
5457
* have received all old operations via the recovery mechanism and are kept up to date by the various replications actions. The set of
5558
* shards that are taken into account for the global checkpoint calculation are called the "in-sync shards".
5659
* <p>
5760
* The global checkpoint is maintained by the primary shard and is replicated to all the replicas (via {@link GlobalCheckpointSyncAction}).
5861
*/
59-
public class GlobalCheckpointTracker extends AbstractIndexShardComponent implements LongSupplier {
62+
public class ReplicationTracker extends AbstractIndexShardComponent implements LongSupplier {
6063

6164
/**
6265
* The allocation ID for the shard to which this tracker is a component of.
@@ -146,30 +149,49 @@ public static class CheckpointState implements Writeable {
146149
*/
147150
boolean inSync;
148151

149-
public CheckpointState(long localCheckpoint, long globalCheckpoint, boolean inSync) {
152+
/**
153+
* whether this shard is tracked in the replication group, i.e., should receive document updates from the primary.
154+
*/
155+
boolean tracked;
156+
157+
public CheckpointState(long localCheckpoint, long globalCheckpoint, boolean inSync, boolean tracked) {
150158
this.localCheckpoint = localCheckpoint;
151159
this.globalCheckpoint = globalCheckpoint;
152160
this.inSync = inSync;
161+
this.tracked = tracked;
153162
}
154163

155164
public CheckpointState(StreamInput in) throws IOException {
156165
this.localCheckpoint = in.readZLong();
157166
this.globalCheckpoint = in.readZLong();
158167
this.inSync = in.readBoolean();
168+
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
169+
this.tracked = in.readBoolean();
170+
} else {
171+
// Every in-sync shard copy is also tracked (see invariant). This was the case even in earlier ES versions.
172+
// Non in-sync shard copies might be tracked or not. As this information here is only serialized during relocation hand-off,
173+
// after which replica recoveries cannot complete anymore (i.e. they cannot move from in-sync == false to in-sync == true),
174+
// we can treat non in-sync replica shard copies as untracked. They will go through a fresh recovery against the new
175+
// primary and will become tracked again under this primary before they are marked as in-sync.
176+
this.tracked = inSync;
177+
}
159178
}
160179

161180
@Override
162181
public void writeTo(StreamOutput out) throws IOException {
163182
out.writeZLong(localCheckpoint);
164183
out.writeZLong(globalCheckpoint);
165184
out.writeBoolean(inSync);
185+
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
186+
out.writeBoolean(tracked);
187+
}
166188
}
167189

168190
/**
169191
* Returns a full copy of this object
170192
*/
171193
public CheckpointState copy() {
172-
return new CheckpointState(localCheckpoint, globalCheckpoint, inSync);
194+
return new CheckpointState(localCheckpoint, globalCheckpoint, inSync, tracked);
173195
}
174196

175197
public long getLocalCheckpoint() {
@@ -186,6 +208,7 @@ public String toString() {
186208
"localCheckpoint=" + localCheckpoint +
187209
", globalCheckpoint=" + globalCheckpoint +
188210
", inSync=" + inSync +
211+
", tracked=" + tracked +
189212
'}';
190213
}
191214

@@ -198,14 +221,16 @@ public boolean equals(Object o) {
198221

199222
if (localCheckpoint != that.localCheckpoint) return false;
200223
if (globalCheckpoint != that.globalCheckpoint) return false;
201-
return inSync == that.inSync;
224+
if (inSync != that.inSync) return false;
225+
return tracked == that.tracked;
202226
}
203227

204228
@Override
205229
public int hashCode() {
206230
int result = Long.hashCode(localCheckpoint);
207231
result = 31 * result + Long.hashCode(globalCheckpoint);
208232
result = 31 * result + Boolean.hashCode(inSync);
233+
result = 31 * result + Boolean.hashCode(tracked);
209234
return result;
210235
}
211236
}
@@ -301,6 +326,9 @@ private boolean invariant() {
301326
// blocking global checkpoint advancement only happens for shards that are not in-sync
302327
assert !pendingInSync.contains(entry.getKey()) || !entry.getValue().inSync :
303328
"shard copy " + entry.getKey() + " blocks global checkpoint advancement but is in-sync";
329+
// in-sync shard copies are tracked
330+
assert !entry.getValue().inSync || entry.getValue().tracked :
331+
"shard copy " + entry.getKey() + " is in-sync but not tracked";
304332
}
305333

306334
return true;
@@ -330,7 +358,7 @@ private static long inSyncCheckpointStates(
330358
* @param indexSettings the index settings
331359
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
332360
*/
333-
public GlobalCheckpointTracker(
361+
public ReplicationTracker(
334362
final ShardId shardId,
335363
final String allocationId,
336364
final IndexSettings indexSettings,
@@ -342,7 +370,7 @@ public GlobalCheckpointTracker(
342370
this.handoffInProgress = false;
343371
this.appliedClusterStateVersion = -1L;
344372
this.checkpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas());
345-
checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false));
373+
checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false));
346374
this.pendingInSync = new HashSet<>();
347375
this.routingTable = null;
348376
this.replicationGroup = null;
@@ -361,7 +389,8 @@ public ReplicationGroup getReplicationGroup() {
361389

362390
private ReplicationGroup calculateReplicationGroup() {
363391
return new ReplicationGroup(routingTable,
364-
checkpoints.entrySet().stream().filter(e -> e.getValue().inSync).map(Map.Entry::getKey).collect(Collectors.toSet()));
392+
checkpoints.entrySet().stream().filter(e -> e.getValue().inSync).map(Map.Entry::getKey).collect(Collectors.toSet()),
393+
checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).map(Map.Entry::getKey).collect(Collectors.toSet()));
365394
}
366395

367396
/**
@@ -481,7 +510,7 @@ public synchronized void updateFromMaster(final long applyingClusterStateVersion
481510
final long localCheckpoint = pre60AllocationIds.contains(initializingId) ?
482511
SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO;
483512
final long globalCheckpoint = localCheckpoint;
484-
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, inSync));
513+
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, inSync, inSync));
485514
}
486515
}
487516
} else {
@@ -490,18 +519,20 @@ public synchronized void updateFromMaster(final long applyingClusterStateVersion
490519
final long localCheckpoint = pre60AllocationIds.contains(initializingId) ?
491520
SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO;
492521
final long globalCheckpoint = localCheckpoint;
493-
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, false));
522+
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, false, false));
494523
}
495524
}
496525
for (String inSyncId : inSyncAllocationIds) {
497526
if (shardAllocationId.equals(inSyncId)) {
498527
// current shard is initially marked as not in-sync because we don't know better at that point
499-
checkpoints.get(shardAllocationId).inSync = true;
528+
CheckpointState checkpointState = checkpoints.get(shardAllocationId);
529+
checkpointState.inSync = true;
530+
checkpointState.tracked = true;
500531
} else {
501532
final long localCheckpoint = pre60AllocationIds.contains(inSyncId) ?
502533
SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO;
503534
final long globalCheckpoint = localCheckpoint;
504-
checkpoints.put(inSyncId, new CheckpointState(localCheckpoint, globalCheckpoint, true));
535+
checkpoints.put(inSyncId, new CheckpointState(localCheckpoint, globalCheckpoint, true, true));
505536
}
506537
}
507538
}
@@ -516,19 +547,22 @@ public synchronized void updateFromMaster(final long applyingClusterStateVersion
516547
}
517548

518549
/**
519-
* Called when the recovery process for a shard is ready to open the engine on the target shard. Ensures that the right data structures
520-
* have been set up locally to track local checkpoint information for the shard.
550+
* Called when the recovery process for a shard has opened the engine on the target shard. Ensures that the right data structures
551+
* have been set up locally to track local checkpoint information for the shard and that the shard is added to the replication group.
521552
*
522553
* @param allocationId the allocation ID of the shard for which recovery was initiated
523554
*/
524555
public synchronized void initiateTracking(final String allocationId) {
525556
assert invariant();
526557
assert primaryMode;
558+
assert handoffInProgress == false;
527559
CheckpointState cps = checkpoints.get(allocationId);
528560
if (cps == null) {
529561
// can happen if replica was removed from cluster but recovery process is unaware of it yet
530562
throw new IllegalStateException("no local checkpoint tracking information available");
531563
}
564+
cps.tracked = true;
565+
replicationGroup = calculateReplicationGroup();
532566
assert invariant();
533567
}
534568

@@ -551,6 +585,7 @@ public synchronized void markAllocationIdAsInSync(final String allocationId, fin
551585
assert localCheckpoint >= SequenceNumbers.NO_OPS_PERFORMED :
552586
"expected known local checkpoint for " + allocationId + " but was " + localCheckpoint;
553587
assert pendingInSync.contains(allocationId) == false : "shard copy " + allocationId + " is already marked as pending in-sync";
588+
assert cps.tracked : "shard copy " + allocationId + " cannot be marked as in-sync as it's not tracked";
554589
updateLocalCheckpoint(allocationId, cps, localCheckpoint);
555590
// if it was already in-sync (because of a previously failed recovery attempt), global checkpoint must have been
556591
// stuck from advancing

0 commit comments

Comments
 (0)