Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
4d4b0df
Retain history for peer recovery using leases
DaveCTurner Feb 19, 2019
1db54e6
Imports
DaveCTurner Feb 19, 2019
fa27f06
Merge branch 'master' into 2019-02-19-soft-deletes-history-retention-WIP
DaveCTurner Feb 19, 2019
bfaf734
No need for this
DaveCTurner Feb 19, 2019
e9971f9
Compile error
DaveCTurner Feb 19, 2019
316c4ed
contains -> hasItem
DaveCTurner Feb 20, 2019
98e2708
Must renew leases now
DaveCTurner Feb 20, 2019
4241335
Now renew/sync returns a future so that tests can wait
DaveCTurner Feb 20, 2019
5d2f50a
WIP on CcrRetentionLeaseIT
DaveCTurner Feb 21, 2019
4d960d3
Merge branch 'master' into 2019-02-19-soft-deletes-history-retention
DaveCTurner Feb 25, 2019
08a6831
We already set waitForActiveShards
DaveCTurner Feb 25, 2019
d2ab443
Imports
DaveCTurner Feb 25, 2019
4cae8c5
More imports
DaveCTurner Feb 25, 2019
6639a57
Imports
DaveCTurner Feb 25, 2019
fdd1c97
Imports
DaveCTurner Feb 25, 2019
63fb818
Revert
DaveCTurner Feb 25, 2019
12064ac
Set up initial replication targets for retention lease syncing
DaveCTurner Feb 25, 2019
8379650
Fix new test
DaveCTurner Feb 25, 2019
d570746
hasItem
DaveCTurner Feb 25, 2019
884d84e
Fix testSnapshotFileFailureDuringSnapshot
DaveCTurner Feb 25, 2019
7b50472
Merge branch '2019-02-25-testSnapshotFileFailureDuringSnapshot-root-c…
DaveCTurner Feb 25, 2019
e604567
Merge branch 'master' into 2019-02-19-soft-deletes-history-retention
DaveCTurner Feb 25, 2019
fd71984
Finer assertion
DaveCTurner Feb 25, 2019
ee3efa7
Imports
DaveCTurner Feb 25, 2019
b2ec2d0
Fix up CcrRetentionLeaseIT
DaveCTurner Feb 26, 2019
55adaeb
Merge branch 'master' into 2019-02-19-soft-deletes-history-retention-TMP
DaveCTurner Feb 27, 2019
30cf6d4
Fix up test
DaveCTurner Feb 27, 2019
6709175
WIP sharing LCPoSC with every replication response
DaveCTurner Feb 27, 2019
2395b7a
WIP continues
DaveCTurner Feb 27, 2019
fa6ae12
More WIP
DaveCTurner Feb 27, 2019
9b45156
Imports
DaveCTurner Feb 27, 2019
ed18d5c
Weaken retention leases assertion so it only applies in supported ver…
DaveCTurner Mar 1, 2019
6f8f48b
Create missing leases
DaveCTurner Mar 1, 2019
7064a6c
WIP fixing ReplicationTrackerTests
DaveCTurner Mar 1, 2019
1ae5061
Merge branch 'master' into 2019-02-19-soft-deletes-history-retention
DaveCTurner Mar 20, 2019
3e01465
Test fixes
DaveCTurner Mar 20, 2019
684e395
Allow retention lease syncs to happen even on closed indices
DaveCTurner Mar 21, 2019
47ed955
Imports
DaveCTurner Mar 21, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public void execute() throws Exception {
pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
primaryResult = primary.perform(request);
primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint());
primary.updateLocalCheckpointOfSafeCommitForShard(primaryRouting.allocationId().getId(), primary.localCheckpointOfSafeCommit());
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest != null) {
if (logger.isTraceEnabled()) {
Expand Down Expand Up @@ -170,6 +171,7 @@ public void onResponse(ReplicaResponse response) {
try {
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
primary.updateLocalCheckpointOfSafeCommitForShard(shard.allocationId().getId(), response.localCheckpointOfSafeCommit());
} catch (final AlreadyClosedException e) {
// okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally
} catch (final Exception e) {
Expand Down Expand Up @@ -334,6 +336,14 @@ public interface Primary<
*/
void updateGlobalCheckpointForShard(String allocationId, long globalCheckpoint);

/**
* Update the local knowledge of the local checkpoint of the safe commit for the specified allocation ID.
*
* @param allocationId the allocation ID to update the local checkpoint of the safe commit for
* @param localCheckpointOfSafeCommit the local checkpoint of the safe commit
*/
void updateLocalCheckpointOfSafeCommitForShard(String allocationId, long localCheckpointOfSafeCommit);

/**
* Returns the local checkpoint on the primary shard.
*
Expand All @@ -348,6 +358,13 @@ public interface Primary<
*/
long globalCheckpoint();

/**
* Returns the local checkpoint of the safe commit on the primary shard.
*
* @return the local checkpoint of the safe commit
*/
long localCheckpointOfSafeCommit();

/**
* Returns the maximum seq_no of updates (index operations overwrite Lucene) or deletes on the primary.
* This value must be captured after the execution of a replication request on the primary is completed.
Expand Down Expand Up @@ -423,6 +440,12 @@ public interface ReplicaResponse {
**/
long globalCheckpoint();

/**
* The local checkpoint of the safe commit for the shard copy.
*
* @return the local checkpoint of the safe commit
**/
long localCheckpointOfSafeCommit();
}

public static class RetryOnPrimaryException extends ElasticsearchException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionResponse;
Expand Down Expand Up @@ -231,6 +232,13 @@ protected boolean resolveIndex() {
return true;
}

/**
* True if this action can be replicated even on closed indices.
*/
protected boolean isReplicatedOnClosedIndices() {
return false;
}

protected TransportRequestOptions transportOptions(Settings settings) {
return TransportRequestOptions.EMPTY;
}
Expand Down Expand Up @@ -618,7 +626,8 @@ public void onResponse(Releasable releasable) {
final ReplicaResult replicaResult = shardOperationOnReplica(request, replica);
releasable.close(); // release shard operation lock before responding to caller
final TransportReplicationAction.ReplicaResponse response =
new ReplicaResponse(replica.getLocalCheckpoint(), replica.getGlobalCheckpoint());
new ReplicaResponse(replica.getLocalCheckpoint(), replica.getGlobalCheckpoint(),
replica.getLocalCheckpointOfSafeCommit());
replicaResult.respond(new ResponseListener(response));
} catch (final Exception e) {
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
Expand Down Expand Up @@ -773,7 +782,7 @@ protected void doRun() {
retry(new IndexNotFoundException(concreteIndex));
return;
}
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
if (isReplicatedOnClosedIndices() == false && indexMetaData.getState() == IndexMetaData.State.CLOSE) {
throw new IndexClosedException(indexMetaData.getIndex());
}

Expand Down Expand Up @@ -1045,6 +1054,11 @@ public void updateGlobalCheckpointForShard(final String allocationId, final long
indexShard.updateGlobalCheckpointForShard(allocationId, globalCheckpoint);
}

@Override
public void updateLocalCheckpointOfSafeCommitForShard(final String allocationId, final long localCheckpointOfSafeCommit) {
indexShard.updateLocalCheckpointOfSafeCommitForShard(allocationId, localCheckpointOfSafeCommit);
}

@Override
public long localCheckpoint() {
return indexShard.getLocalCheckpoint();
Expand All @@ -1055,6 +1069,11 @@ public long globalCheckpoint() {
return indexShard.getGlobalCheckpoint();
}

@Override
public long localCheckpointOfSafeCommit() {
return indexShard.getLocalCheckpointOfSafeCommit();
}

@Override
public long maxSeqNoOfUpdatesOrDeletes() {
return indexShard.getMaxSeqNoOfUpdatesOrDeletes();
Expand All @@ -1070,12 +1089,13 @@ public ReplicationGroup getReplicationGroup() {
public static class ReplicaResponse extends ActionResponse implements ReplicationOperation.ReplicaResponse {
private long localCheckpoint;
private long globalCheckpoint;
private long localCheckpointOfSafeCommit;

ReplicaResponse() {

}

public ReplicaResponse(long localCheckpoint, long globalCheckpoint) {
public ReplicaResponse(long localCheckpoint, long globalCheckpoint, long localCheckpointOfSafeCommit) {
/*
* A replica should always know its own local checkpoints so this should always be a valid sequence number or the pre-6.0
* checkpoint value when simulating responses to replication actions that pre-6.0 nodes are not aware of (e.g., the global
Expand All @@ -1084,20 +1104,29 @@ public ReplicaResponse(long localCheckpoint, long globalCheckpoint) {
assert localCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO;
this.localCheckpoint = localCheckpoint;
this.globalCheckpoint = globalCheckpoint;
this.localCheckpointOfSafeCommit = localCheckpointOfSafeCommit;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
localCheckpoint = in.readZLong();
globalCheckpoint = in.readZLong();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) { // TODO V_7_0_0 for backport
localCheckpointOfSafeCommit = in.readZLong();
} else {
localCheckpointOfSafeCommit = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeZLong(localCheckpoint);
out.writeZLong(globalCheckpoint);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) { // TODO V_7_0_0 for backport
out.writeZLong(localCheckpointOfSafeCommit);
}
}

@Override
Expand All @@ -1110,18 +1139,24 @@ public long globalCheckpoint() {
return globalCheckpoint;
}

@Override
public long localCheckpointOfSafeCommit() {
return localCheckpointOfSafeCommit;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ReplicaResponse that = (ReplicaResponse) o;
return localCheckpoint == that.localCheckpoint &&
globalCheckpoint == that.globalCheckpoint;
globalCheckpoint == that.globalCheckpoint &&
localCheckpointOfSafeCommit == that.localCheckpointOfSafeCommit;
}

@Override
public int hashCode() {
return Objects.hash(localCheckpoint, globalCheckpoint);
return Objects.hash(localCheckpoint, globalCheckpoint, localCheckpointOfSafeCommit);
}
}

Expand Down
51 changes: 48 additions & 3 deletions server/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.lucene.util.Accountable;
import org.elasticsearch.Assertions;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -88,6 +89,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -122,6 +124,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private volatile AsyncTranslogFSync fsyncTask;
private volatile AsyncGlobalCheckpointTask globalCheckpointTask;
private volatile AsyncRetentionLeaseSyncTask retentionLeaseSyncTask;
private volatile AsyncPeerRecoveryRetentionLeaseRenewalTask peerRecoveryRetentionLeaseRenewalTask;

// don't convert to Setting<> and register... we only set this in tests and register via a plugin
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";
Expand Down Expand Up @@ -199,6 +202,7 @@ public IndexService(
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
this.peerRecoveryRetentionLeaseRenewalTask = new AsyncPeerRecoveryRetentionLeaseRenewalTask(this);
rescheduleFsyncTask(indexSettings.getTranslogDurability());
}

Expand Down Expand Up @@ -289,7 +293,8 @@ public synchronized void close(final String reason, boolean delete) throws IOExc
fsyncTask,
trimTranslogTask,
globalCheckpointTask,
retentionLeaseSyncTask);
retentionLeaseSyncTask,
peerRecoveryRetentionLeaseRenewalTask);
}
}
}
Expand Down Expand Up @@ -317,8 +322,10 @@ private long getAvgShardSizeInBytes() throws IOException {
public synchronized IndexShard createShard(
final ShardRouting routing,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer) throws IOException {
final RetentionLeaseSyncer retentionLeaseSyncer,
final BiConsumer<ShardId, ActionListener<Void>> peerRecoveryRetentionLeaseRenewer) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
Objects.requireNonNull(peerRecoveryRetentionLeaseRenewer);
/*
* TODO: we execute this in parallel but it's a synced method. Yet, we might
* be able to serialize the execution via the cluster state in the future. for now we just
Expand Down Expand Up @@ -408,7 +415,8 @@ public synchronized IndexShard createShard(
indexingOperationListeners,
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService);
circuitBreakerService,
listener -> peerRecoveryRetentionLeaseRenewer.accept(shardId, listener));
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
Expand Down Expand Up @@ -795,6 +803,10 @@ private void syncRetentionLeases() {
}
}

private void renewPeerRecoveryRetentionLeases() {
sync(IndexShard::renewPeerRecoveryRetentionLeases, "peer recovery retention leases");
}

private void sync(final Consumer<IndexShard> sync, final String source) {
for (final IndexShard shard : this.shards.values()) {
if (shard.routingEntry().active() && shard.routingEntry().primary()) {
Expand Down Expand Up @@ -937,6 +949,15 @@ public String toString() {
Property.Dynamic,
Property.IndexScope);

// this setting is intentionally not registered, it is only used in tests
public static final Setting<TimeValue> RETENTION_LEASE_PEER_RECOVERY_SYNC_INTERVAL_SETTING =
Setting.timeSetting(
"index.soft_deletes.retention_lease.peer_recovery.sync_interval",
new TimeValue(5, TimeUnit.MINUTES),
new TimeValue(0, TimeUnit.MILLISECONDS),
Property.Dynamic,
Property.IndexScope);

/**
* Background task that syncs the global checkpoint to replicas.
*/
Expand Down Expand Up @@ -986,6 +1007,30 @@ public String toString() {

}

final class AsyncPeerRecoveryRetentionLeaseRenewalTask extends BaseAsyncTask {

// TODO perhaps we can just piggyback on the synced flush that happens after 5 minutes of inactivity instead?

AsyncPeerRecoveryRetentionLeaseRenewalTask(final IndexService indexService) {
super(indexService, RETENTION_LEASE_PEER_RECOVERY_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings()));
}

@Override
protected void runInternal() {
indexService.renewPeerRecoveryRetentionLeases();
}

@Override
protected String getThreadPool() {
return ThreadPool.Names.MANAGEMENT;
}

@Override
public String toString() {
return "peer_recovery_retention_lease_sync";
}
}

AsyncRefreshTask getRefreshTask() { // for tests
return refreshTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ protected final DocsStats docsStats(IndexReader indexReader) {
return new DocsStats(numDocs, numDeletedDocs, sizeInBytes);
}

public abstract long getLocalCheckpointOfSafeCommit();

/**
* Performs the pre-closing checks on the {@link Engine}.
*
Expand Down Expand Up @@ -766,7 +768,7 @@ public abstract int estimateNumberOfHistoryOperations(String source,
MapperService mapperService, long startingSeqNo) throws IOException;

/**
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its translog
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog)
*/
public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException;

Expand Down
Loading