Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recover retention leases during peer recovery 6.x #38478

Merged
merged 2 commits into from
Feb 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -3052,7 +3053,7 @@ public long getMaxSeqNoOfUpdatesOrDeletes() {
* which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary.
*
* @see #acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)
* @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long, long, ActionListener)
* @see RecoveryTarget#indexTranslogOperations(List, int, long, long, RetentionLeases, ActionListener)
*/
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
assert seqNo != UNASSIGNED_SEQ_NO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,17 +515,21 @@ public void onTimeout(TimeValue timeout) {
}
});
};
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(),
request.maxSeenAutoIdTimestampOnPrimary(), request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
ActionListener.wrap(
checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)),
e -> {
if (e instanceof MapperException) {
retryOnMappingException.accept(e);
} else {
listener.onFailure(e);
}
})
recoveryTarget.indexTranslogOperations(
request.operations(),
request.totalTranslogOps(),
request.maxSeenAutoIdTimestampOnPrimary(),
request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
request.retentionLeases(),
ActionListener.wrap(
checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)),
e -> {
if (e instanceof MapperException) {
retryOnMappingException.accept(e);
} else {
listener.onFailure(e);
}
})
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
Expand Down Expand Up @@ -231,8 +232,16 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep);
final RetentionLeases retentionLeases = shard.getRetentionLeases();
phase2(
startingSeqNo,
requiredSeqNoRangeStart,
endingSeqNo,
phase2Snapshot,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
sendSnapshotStep);
sendSnapshotStep.whenComplete(
r -> IOUtils.close(phase2Snapshot),
e -> {
Expand Down Expand Up @@ -517,8 +526,15 @@ void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, A
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it.
* @param listener a listener which will be notified with the local checkpoint on the target.
*/
void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes, ActionListener<SendSnapshotResult> listener) throws IOException {
void phase2(
final long startingSeqNo,
final long requiredSeqNoRangeStart,
final long endingSeqNo,
final Translog.Snapshot snapshot,
final long maxSeenAutoIdTimestamp,
final long maxSeqNoOfUpdatesOrDeletes,
final RetentionLeases retentionLeases,
final ActionListener<SendSnapshotResult> listener) throws IOException {
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
assert startingSeqNo <= requiredSeqNoRangeStart :
Expand Down Expand Up @@ -584,25 +600,50 @@ void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
listener::onFailure
);

sendBatch(readNextBatch, true, SequenceNumbers.UNASSIGNED_SEQ_NO, snapshot.totalOperations(),
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, batchedListener);
sendBatch(
readNextBatch,
true,
SequenceNumbers.UNASSIGNED_SEQ_NO,
snapshot.totalOperations(),
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
batchedListener);
}

private void sendBatch(CheckedSupplier<List<Translog.Operation>, IOException> nextBatch, boolean firstBatch,
long targetLocalCheckpoint, int totalTranslogOps, long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes, ActionListener<Long> listener) throws IOException {
private void sendBatch(
final CheckedSupplier<List<Translog.Operation>, IOException> nextBatch,
final boolean firstBatch,
final long targetLocalCheckpoint,
final int totalTranslogOps,
final long maxSeenAutoIdTimestamp,
final long maxSeqNoOfUpdatesOrDeletes,
final RetentionLeases retentionLeases,
final ActionListener<Long> listener) throws IOException {
final List<Translog.Operation> operations = nextBatch.get();
// send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
if (operations.isEmpty() == false || firstBatch) {
cancellableThreads.execute(() -> {
recoveryTarget.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
ActionListener.wrap(
newCheckpoint -> {
sendBatch(nextBatch, false, SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint),
totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, listener);
},
listener::onFailure
));
recoveryTarget.indexTranslogOperations(
operations,
totalTranslogOps,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
ActionListener.wrap(
newCheckpoint -> {
sendBatch(
nextBatch,
false,
SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint),
totalTranslogOps,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
listener);
},
listener::onFailure
));
});
} else {
listener.onResponse(targetLocalCheckpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardNotRecoveringException;
Expand Down Expand Up @@ -400,8 +401,13 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar
}

@Override
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
long maxSeqNoOfDeletesOrUpdatesOnPrimary, ActionListener<Long> listener) {
public void indexTranslogOperations(
final List<Translog.Operation> operations,
final int totalTranslogOps,
final long maxSeenAutoIdTimestampOnPrimary,
final long maxSeqNoOfDeletesOrUpdatesOnPrimary,
final RetentionLeases retentionLeases,
final ActionListener<Long> listener) {
ActionListener.completeWith(listener, () -> {
final RecoveryState.Translog translog = state().getTranslog();
translog.totalOperations(totalTranslogOps);
Expand All @@ -421,6 +427,11 @@ public void indexTranslogOperations(List<Translog.Operation> operations, int tot
* replaying any of these operations will be at least the max_seq_no_of_updates on the primary when that op was executed on.
*/
indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfDeletesOrUpdatesOnPrimary);
/*
* We have to update the retention leases before we start applying translog operations to ensure we are retaining according to
* the policy.
*/
indexShard().updateRetentionLeasesOnReplica(retentionLeases);
for (Translog.Operation operation : operations) {
Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
Expand All @@ -39,8 +40,8 @@ public interface RecoveryTargetHandler {
void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener);

/**
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and
* updates the global checkpoint.
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, updates
* the global checkpoint.
*
* @param globalCheckpoint the global checkpoint on the recovery source
* @param listener the listener which will be notified when this method is completed
Expand Down Expand Up @@ -68,11 +69,17 @@ public interface RecoveryTargetHandler {
* @param maxSeqNoOfUpdatesOrDeletesOnPrimary the max seq_no of update operations (index operations overwrite Lucene) or delete ops on
* the primary shard when capturing these operations. This value is at least as high as the
* max_seq_no_of_updates on the primary was when any of these ops were processed on it.
* @param retentionLeases the retention leases on the primary
* @param listener a listener which will be notified with the local checkpoint on the target
* after these operations are successfully indexed on the target.
*/
void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
long maxSeqNoOfUpdatesOrDeletesOnPrimary, ActionListener<Long> listener);
void indexTranslogOperations(
List<Translog.Operation> operations,
int totalTranslogOps,
long maxSeenAutoIdTimestampOnPrimary,
long maxSeqNoOfUpdatesOrDeletesOnPrimary,
RetentionLeases retentionLeases,
ActionListener<Long> listener);

/**
* Notifies the target of the files it is going to receive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
Expand All @@ -39,18 +40,26 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
private long maxSeenAutoIdTimestampOnPrimary;
private long maxSeqNoOfUpdatesOrDeletesOnPrimary;
private RetentionLeases retentionLeases;

public RecoveryTranslogOperationsRequest() {
}

RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List<Translog.Operation> operations, int totalTranslogOps,
long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfUpdatesOrDeletesOnPrimary) {
RecoveryTranslogOperationsRequest(
final long recoveryId,
final ShardId shardId,
final List<Translog.Operation> operations,
final int totalTranslogOps,
final long maxSeenAutoIdTimestampOnPrimary,
final long maxSeqNoOfUpdatesOrDeletesOnPrimary,
final RetentionLeases retentionLeases) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.operations = operations;
this.totalTranslogOps = totalTranslogOps;
this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary;
this.maxSeqNoOfUpdatesOrDeletesOnPrimary = maxSeqNoOfUpdatesOrDeletesOnPrimary;
this.retentionLeases = retentionLeases;
}

public long recoveryId() {
Expand All @@ -77,6 +86,10 @@ public long maxSeqNoOfUpdatesOrDeletesOnPrimary() {
return maxSeqNoOfUpdatesOrDeletesOnPrimary;
}

public RetentionLeases retentionLeases() {
return retentionLeases;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand All @@ -95,6 +108,11 @@ public void readFrom(StreamInput in) throws IOException {
// UNASSIGNED_SEQ_NO means uninitialized and replica won't enable optimization using seq_no
maxSeqNoOfUpdatesOrDeletesOnPrimary = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
if (in.getVersion().onOrAfter(Version.V_6_7_0)) {
retentionLeases = new RetentionLeases(in);
} else {
retentionLeases = RetentionLeases.EMPTY;
}
}

@Override
Expand All @@ -110,5 +128,8 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_6_5_0)) {
out.writeZLong(maxSeqNoOfUpdatesOrDeletesOnPrimary);
}
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
retentionLeases.writeTo(out);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
Expand Down Expand Up @@ -113,10 +114,21 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar
}

@Override
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
long maxSeqNoOfDeletesOrUpdatesOnPrimary, ActionListener<Long> listener) {
public void indexTranslogOperations(
final List<Translog.Operation> operations,
final int totalTranslogOps,
final long maxSeenAutoIdTimestampOnPrimary,
final long maxSeqNoOfDeletesOrUpdatesOnPrimary,
final RetentionLeases retentionLeases,
final ActionListener<Long> listener) {
final RecoveryTranslogOperationsRequest request = new RecoveryTranslogOperationsRequest(
recoveryId, shardId, operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary, maxSeqNoOfDeletesOrUpdatesOnPrimary);
recoveryId,
shardId,
operations,
totalTranslogOps,
maxSeenAutoIdTimestampOnPrimary,
maxSeqNoOfDeletesOrUpdatesOnPrimary,
retentionLeases);
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.TRANSLOG_OPS, request, translogOpsRequestOptions,
new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(r.localCheckpoint), listener::onFailure),
RecoveryTranslogOperationsResponse::new, ThreadPool.Names.GENERIC));
Expand Down
Loading