Skip to content

Commit c728aea

Browse files
committed
Replicate max seq_no of updates to replicas (#33967)
We start tracking max seq_no_of_updates on the primary in #33842. This commit replicates that value from a primary to its replicas in replication requests or the translog phase of peer-recovery. With this change, we guarantee that the value of max seq_no_of_updates on a replica when any index/delete operation is performed at least the max_seq_no_of_updates on the primary when that operation was executed. Relates #33656
1 parent c709708 commit c728aea

File tree

25 files changed

+395
-127
lines changed

25 files changed

+395
-127
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.cluster.routing.ShardRouting;
3232
import org.elasticsearch.common.Nullable;
3333
import org.elasticsearch.common.io.stream.StreamInput;
34+
import org.elasticsearch.index.seqno.SequenceNumbers;
3435
import org.elasticsearch.index.shard.ReplicationGroup;
3536
import org.elasticsearch.index.shard.ShardId;
3637
import org.elasticsearch.rest.RestStatus;
@@ -114,9 +115,13 @@ public void execute() throws Exception {
114115
// of the sampled replication group, and advanced further than what the given replication group would allow it to.
115116
// This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
116117
final long globalCheckpoint = primary.globalCheckpoint();
118+
// we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of
119+
// max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed on.
120+
final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();
121+
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized";
117122
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
118123
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
119-
performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup);
124+
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
120125
}
121126

122127
successfulShards.incrementAndGet(); // mark primary as successful
@@ -136,7 +141,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Replica
136141
}
137142

138143
private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint,
139-
final ReplicationGroup replicationGroup) {
144+
final long maxSeqNoOfUpdatesOrDeletes, final ReplicationGroup replicationGroup) {
140145
// for total stats, add number of unassigned shards and
141146
// number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target)
142147
totalShards.addAndGet(replicationGroup.getSkippedShards().size());
@@ -145,19 +150,20 @@ private void performOnReplicas(final ReplicaRequest replicaRequest, final long g
145150

146151
for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
147152
if (shard.isSameAllocation(primaryRouting) == false) {
148-
performOnReplica(shard, replicaRequest, globalCheckpoint);
153+
performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
149154
}
150155
}
151156
}
152157

153-
private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest, final long globalCheckpoint) {
158+
private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest,
159+
final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) {
154160
if (logger.isTraceEnabled()) {
155161
logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);
156162
}
157163

158164
totalShards.incrementAndGet();
159165
pendingActions.incrementAndGet();
160-
replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, new ActionListener<ReplicaResponse>() {
166+
replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, new ActionListener<ReplicaResponse>() {
161167
@Override
162168
public void onResponse(ReplicaResponse response) {
163169
successfulShards.incrementAndGet();
@@ -322,6 +328,12 @@ public interface Primary<
322328
*/
323329
long globalCheckpoint();
324330

331+
/**
332+
* Returns the maximum seq_no of updates (index operations overwrite Lucene) or deletes on the primary.
333+
* This value must be captured after the execution of a replication request on the primary is completed.
334+
*/
335+
long maxSeqNoOfUpdatesOrDeletes();
336+
325337
/**
326338
* Returns the current replication group on the primary shard
327339
*
@@ -338,12 +350,15 @@ public interface Replicas<RequestT extends ReplicationRequest<RequestT>> {
338350
/**
339351
* Performs the specified request on the specified replica.
340352
*
341-
* @param replica the shard this request should be executed on
342-
* @param replicaRequest the operation to perform
343-
* @param globalCheckpoint the global checkpoint on the primary
344-
* @param listener callback for handling the response or failure
353+
* @param replica the shard this request should be executed on
354+
* @param replicaRequest the operation to perform
355+
* @param globalCheckpoint the global checkpoint on the primary
356+
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwriting Lucene) or deletes on primary
357+
* after this replication was executed on it.
358+
* @param listener callback for handling the response or failure
345359
*/
346-
void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpoint, ActionListener<ReplicaResponse> listener);
360+
void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpoint,
361+
long maxSeqNoOfUpdatesOrDeletes, ActionListener<ReplicaResponse> listener);
347362

348363
/**
349364
* Fail the specified shard if needed, removing it from the current set

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ protected abstract PrimaryResult<ReplicaRequest, Response> shardOperationOnPrima
201201

202202
/**
203203
* Synchronously execute the specified replica operation. This is done under a permit from
204-
* {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String, Object)}.
204+
* {@link IndexShard#acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)}.
205205
*
206206
* @param shardRequest the request to the replica shard
207207
* @param replica the replica shard to perform the operation on
@@ -506,6 +506,7 @@ public void messageReceived(
506506
replicaRequest.getTargetAllocationID(),
507507
replicaRequest.getPrimaryTerm(),
508508
replicaRequest.getGlobalCheckpoint(),
509+
replicaRequest.getMaxSeqNoOfUpdatesOrDeletes(),
509510
channel,
510511
(ReplicationTask) task).run();
511512
}
@@ -530,6 +531,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
530531
private final String targetAllocationID;
531532
private final long primaryTerm;
532533
private final long globalCheckpoint;
534+
private final long maxSeqNoOfUpdatesOrDeletes;
533535
private final TransportChannel channel;
534536
private final IndexShard replica;
535537
/**
@@ -545,6 +547,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
545547
String targetAllocationID,
546548
long primaryTerm,
547549
long globalCheckpoint,
550+
long maxSeqNoOfUpdatesOrDeletes,
548551
TransportChannel channel,
549552
ReplicationTask task) {
550553
this.request = request;
@@ -553,6 +556,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
553556
this.targetAllocationID = targetAllocationID;
554557
this.primaryTerm = primaryTerm;
555558
this.globalCheckpoint = globalCheckpoint;
559+
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
556560
final ShardId shardId = request.shardId();
557561
assert shardId != null : "request shardId must be set";
558562
this.replica = getIndexShard(shardId);
@@ -592,7 +596,8 @@ public void onNewClusterState(ClusterState state) {
592596
new TransportChannelResponseHandler<>(logger, channel, extraMessage,
593597
() -> TransportResponse.Empty.INSTANCE);
594598
transportService.sendRequest(clusterService.localNode(), transportReplicaAction,
595-
new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm, globalCheckpoint),
599+
new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm,
600+
globalCheckpoint, maxSeqNoOfUpdatesOrDeletes),
596601
handler);
597602
}
598603

@@ -630,7 +635,7 @@ protected void doRun() throws Exception {
630635
throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
631636
actualAllocationId);
632637
}
633-
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, this, executor, request);
638+
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, this, executor, request);
634639
}
635640

636641
/**
@@ -1040,6 +1045,11 @@ public long globalCheckpoint() {
10401045
return indexShard.getGlobalCheckpoint();
10411046
}
10421047

1048+
@Override
1049+
public long maxSeqNoOfUpdatesOrDeletes() {
1050+
return indexShard.getMaxSeqNoOfUpdatesOrDeletes();
1051+
}
1052+
10431053
@Override
10441054
public ReplicationGroup getReplicationGroup() {
10451055
return indexShard.getReplicationGroup();
@@ -1124,15 +1134,16 @@ public void performOn(
11241134
final ShardRouting replica,
11251135
final ReplicaRequest request,
11261136
final long globalCheckpoint,
1137+
final long maxSeqNoOfUpdatesOrDeletes,
11271138
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
11281139
String nodeId = replica.currentNodeId();
11291140
final DiscoveryNode node = clusterService.state().nodes().get(nodeId);
11301141
if (node == null) {
11311142
listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]"));
11321143
return;
11331144
}
1134-
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest =
1135-
new ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, globalCheckpoint);
1145+
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest = new ConcreteReplicaRequest<>(
1146+
request, replica.allocationId().getId(), primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
11361147
sendReplicaRequest(replicaRequest, node, listener);
11371148
}
11381149

@@ -1280,15 +1291,17 @@ public String toString() {
12801291
protected static final class ConcreteReplicaRequest<R extends TransportRequest> extends ConcreteShardRequest<R> {
12811292

12821293
private long globalCheckpoint;
1294+
private long maxSeqNoOfUpdatesOrDeletes;
12831295

12841296
public ConcreteReplicaRequest(final Supplier<R> requestSupplier) {
12851297
super(requestSupplier);
12861298
}
12871299

12881300
public ConcreteReplicaRequest(final R request, final String targetAllocationID, final long primaryTerm,
1289-
final long globalCheckpoint) {
1301+
final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) {
12901302
super(request, targetAllocationID, primaryTerm);
12911303
this.globalCheckpoint = globalCheckpoint;
1304+
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
12921305
}
12931306

12941307
@Override
@@ -1299,6 +1312,13 @@ public void readFrom(StreamInput in) throws IOException {
12991312
} else {
13001313
globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
13011314
}
1315+
if (in.getVersion().onOrAfter(Version.V_6_5_0)) {
1316+
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
1317+
} else {
1318+
// UNASSIGNED_SEQ_NO (-2) means uninitialized, and replicas will disable
1319+
// optimization using seq_no if its max_seq_no_of_updates is still uninitialized
1320+
maxSeqNoOfUpdatesOrDeletes = SequenceNumbers.UNASSIGNED_SEQ_NO;
1321+
}
13021322
}
13031323

13041324
@Override
@@ -1307,19 +1327,27 @@ public void writeTo(StreamOutput out) throws IOException {
13071327
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
13081328
out.writeZLong(globalCheckpoint);
13091329
}
1330+
if (out.getVersion().onOrAfter(Version.V_6_5_0)) {
1331+
out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
1332+
}
13101333
}
13111334

13121335
public long getGlobalCheckpoint() {
13131336
return globalCheckpoint;
13141337
}
13151338

1339+
public long getMaxSeqNoOfUpdatesOrDeletes() {
1340+
return maxSeqNoOfUpdatesOrDeletes;
1341+
}
1342+
13161343
@Override
13171344
public String toString() {
13181345
return "ConcreteReplicaRequest{" +
13191346
"targetAllocationID='" + getTargetAllocationID() + '\'' +
13201347
", primaryTerm='" + getPrimaryTerm() + '\'' +
13211348
", request=" + getRequest() +
13221349
", globalCheckpoint=" + globalCheckpoint +
1350+
", maxSeqNoOfUpdatesOrDeletes=" + maxSeqNoOfUpdatesOrDeletes +
13231351
'}';
13241352
}
13251353
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1042,6 +1042,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
10421042
if (plan.addStaleOpToLucene) {
10431043
addStaleDocs(index.docs(), indexWriter);
10441044
} else if (plan.useLuceneUpdateDocument) {
1045+
assert assertMaxSeqNoOfUpdatesIsAdvanced(index.uid(), plan.seqNoForIndexing, true, true);
10451046
updateDocs(index.uid(), index.docs(), indexWriter);
10461047
} else {
10471048
// document does not exists, we can optimize for create, but double check if assertions are running
@@ -1355,8 +1356,8 @@ protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOE
13551356
return plan;
13561357
}
13571358

1358-
private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
1359-
throws IOException {
1359+
private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws IOException {
1360+
assert assertMaxSeqNoOfUpdatesIsAdvanced(delete.uid(), plan.seqNoOfDeletion, false, false);
13601361
try {
13611362
if (softDeleteEnabled) {
13621363
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.type(), delete.id());
@@ -2636,6 +2637,29 @@ private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) {
26362637
assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get();
26372638
}
26382639

2640+
private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean allowDeleted, boolean relaxIfGapInSeqNo) {
2641+
final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes();
2642+
// If the primary is on an old version which does not replicate msu, we need to relax this assertion for that.
2643+
if (maxSeqNoOfUpdates == SequenceNumbers.UNASSIGNED_SEQ_NO) {
2644+
assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_5_0);
2645+
return true;
2646+
}
2647+
// We treat a delete on the tombstones on replicas as a regular document, then use updateDocument (not addDocument).
2648+
if (allowDeleted) {
2649+
final VersionValue versionValue = versionMap.getVersionForAssert(id.bytes());
2650+
if (versionValue != null && versionValue.isDelete()) {
2651+
return true;
2652+
}
2653+
}
2654+
// Operations can be processed on a replica in a different order than on the primary. If the order on the primary is index-1,
2655+
// delete-2, index-3, and the order on a replica is index-1, index-3, delete-2, then the msu of index-3 on the replica is 2
2656+
// even though it is an update (overwrites index-1). We should relax this assertion if there is a pending gap in the seq_no.
2657+
if (relaxIfGapInSeqNo && getLocalCheckpoint() < maxSeqNoOfUpdates) {
2658+
return true;
2659+
}
2660+
assert seqNo <= maxSeqNoOfUpdates : "id=" + id + " seq_no=" + seqNo + " msu=" + maxSeqNoOfUpdates;
2661+
return true;
2662+
}
26392663

26402664
@Override
26412665
public void initializeMaxSeqNoOfUpdatesOrDeletes() {

0 commit comments

Comments
 (0)