Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create peer-recovery retention leases #43190

Conversation

DaveCTurner
Copy link
Contributor

This creates a peer-recovery retention lease for every shard during recovery,
ensuring that the replication group retains history for future peer recoveries.
It also ensures that leases for active shard copies do not expire, and leases
for inactive shard copies expire immediately if the shard is fully-allocated.

Relates #41536

This creates a peer-recovery retention lease for every shard during recovery,
ensuring that the replication group retains history for future peer recoveries.
It also ensures that leases for active shard copies do not expire, and leases
for inactive shard copies expire immediately if the shard is fully-allocated.

Relates elastic#41536
@DaveCTurner DaveCTurner added >enhancement :Distributed Indexing/Recovery Anything around constructing a new shard, either from a local or a remote source. labels Jun 13, 2019
@DaveCTurner DaveCTurner requested review from ywelsch and dnhatn June 13, 2019 09:05
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done a first pass and left some questions, mainly to get a better understanding of the scope of the change.

@@ -260,7 +261,9 @@ public void testForgetFollower() throws IOException {
final Map<?, ?> shardStatsAsMap = (Map<?, ?>) shardStats.get(0);
final Map<?, ?> retentionLeasesStats = (Map<?, ?>) shardStatsAsMap.get("retention_leases");
final List<?> leases = (List<?>) retentionLeasesStats.get("leases");
assertThat(leases, empty());
for (final Object lease : leases) {
assertThat(((Map<?, ?>) lease).get("source"), equalTo(ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we instead assert the absence of CCR leases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not as robustly as I'd like, no. We could say there's no leases with source "ccr", but that's a lot weaker than saying the only remaining leases are PRRLs, similarly to how we previously asserted that there were no leases at all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use toMapExcludingPeerRecoveryRetentionLeases here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not very easily. Here we are the other side of the high-level REST API, and this doesn't include indices stats so we don't have access to a RetentionLeases object. It would be quite some work to build one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I did not realize that it's a rest test.

@@ -669,6 +759,7 @@ public ReplicationTracker(
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;
assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this to catch issues where tests have not been properly set up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if this is unset then the crucial assertions are skipped, which is Very Bad™.

if (retentionLeases.get(leaseId) == null) {
/*
* We might have got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
* leases for every shard copy, but in this case we do not expect any leases to exist.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might also be a recovery from store?
What about when we become primary due to a primary relocation? Do we need to do this as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is explaining the following if (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0)). Covering the cases when the index was created in an earlier version is out of scope here.

In a primary relocation the new primary, being a tracked replica, already has a lease.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Covering the cases when the index was created in an earlier version is out of scope here.

Did you mean there will be another change here? Why don't we do it now ;). The relocating target should not have a lease if the old primary was on an old version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is already a substantial +665/-185, and I think it's unwise to bring BWC into scope at this time. Note that this PR is against a feature branch, not master, so we're ok with missing features for now.

&& shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE) {
runUnderPrimaryPermit(() -> {
try {
// blindly create the lease. TODO integrate this with the recovery process
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean by "blindly" here and what integration you're referring to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, retention leases have no impact on the recovery process, nor do we make any attempt to add a lease for history we've any hope of retaining. E.g. with a file-based recovery we add a lease for all history.

In due course the recovery process will be made more dependent on leases.

: routingTable.activeShards() + " vs " + shardAllocationId;
assert replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard));

// Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need a sync, but why not do one any way? This will persist the leases locally on disk

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing a sync on the cluster applier thread isn't possible as things stand because of the reroute phase; it also would mean waiting for the sync to return, which is something we try and avoid on the applier thread.

We could explicitly persist the leases when calling activatePrimaryMode but I don't think it's necessary to do so.

* Advance the peer-recovery retention lease for all tracked shard copies, for use in tests until advancing these leases is done
* properly. TODO remove this.
*/
public synchronized void advancePeerRecoveryRetentionLeasesToGlobalCheckpoints() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we not automatically advancing the leases when the global checkpoints advance? Is it because it breaks some tests right now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly because I think this change is already large enough without this feature too, and we haven't settled for definite on whether these leases should be GCP-based. Advancing the leases is needed in the tests in very few places, but I haven't tried advancing them more eagerly.

Copy link
Contributor Author

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ywelsch, I responded.

* Advance the peer-recovery retention lease for all tracked shard copies, for use in tests until advancing these leases is done
* properly. TODO remove this.
*/
public synchronized void advancePeerRecoveryRetentionLeasesToGlobalCheckpoints() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly because I think this change is already large enough without this feature too, and we haven't settled for definite on whether these leases should be GCP-based. Advancing the leases is needed in the tests in very few places, but I haven't tried advancing them more eagerly.

@@ -669,6 +759,7 @@ public ReplicationTracker(
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;
assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if this is unset then the crucial assertions are skipped, which is Very Bad™.

if (retentionLeases.get(leaseId) == null) {
/*
* We might have got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
* leases for every shard copy, but in this case we do not expect any leases to exist.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is explaining the following if (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0)). Covering the cases when the index was created in an earlier version is out of scope here.

In a primary relocation the new primary, being a tracked replica, already has a lease.

&& shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE) {
runUnderPrimaryPermit(() -> {
try {
// blindly create the lease. TODO integrate this with the recovery process
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, retention leases have no impact on the recovery process, nor do we make any attempt to add a lease for history we've any hope of retaining. E.g. with a file-based recovery we add a lease for all history.

In due course the recovery process will be made more dependent on leases.

@@ -260,7 +261,9 @@ public void testForgetFollower() throws IOException {
final Map<?, ?> shardStatsAsMap = (Map<?, ?>) shardStats.get(0);
final Map<?, ?> retentionLeasesStats = (Map<?, ?>) shardStatsAsMap.get("retention_leases");
final List<?> leases = (List<?>) retentionLeasesStats.get("leases");
assertThat(leases, empty());
for (final Object lease : leases) {
assertThat(((Map<?, ?>) lease).get("source"), equalTo(ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not as robustly as I'd like, no. We could say there's no leases with source "ccr", but that's a lot weaker than saying the only remaining leases are PRRLs, similarly to how we previously asserted that there were no leases at all.

: routingTable.activeShards() + " vs " + shardAllocationId;
assert replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard));

// Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing a sync on the cluster applier thread isn't possible as things stand because of the reroute phase; it also would mean waiting for the sync to return, which is something we try and avoid on the applier thread.

We could explicitly persist the leases when calling activatePrimaryMode but I don't think it's necessary to do so.

@DaveCTurner DaveCTurner requested a review from ywelsch June 17, 2019 12:30
Copy link
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @DaveCTurner. I left some comments.

/**
* Source for peer recovery retention leases; see {@link ReplicationTracker#addPeerRecoveryRetentionLease}.
*/
public static final String PEER_RECOVERY_RETENTION_LEASE_SOURCE = "peer recovery";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving this constant and two related static methods to RetentionLease class instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think RetentionLease should know about this special kind of retention lease.

final Map<Boolean, List<RetentionLease>> partitionByExpiration = retentionLeases
.leases()
.stream()
.collect(Collectors.groupingBy(lease -> currentTimeMillis - lease.timestamp() > retentionLeaseMillis));
.collect(Collectors.groupingBy(lease -> {
if (lease.source().equals(PEER_RECOVERY_RETENTION_LEASE_SOURCE)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this check a method of RetentionLease?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in #43190 (comment) I don't think RetentionLease should know about this special kind of retention lease.

*
* @param retentionLeases the retention lease collection
* @return the map from retention lease ID to retention lease
*/
static Map<String, RetentionLease> toMap(final RetentionLeases retentionLeases) {
return retentionLeases.leases;
public static Map<String, RetentionLease> toMapExcludingPeerRecoveryRetentionLeases(final RetentionLeases retentionLeases) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this method to test? Maybe test framework?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I pushed 78dd210.

* containing the persistent node ID calculated by {@link ReplicationTracker#getPeerRecoveryRetentionLeaseId}, and retain operations
* with sequence numbers strictly greater than the given global checkpoint.
*/
public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, ActionListener<ReplicationResponse> listener) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this method and prepare these parameters in IndexShard instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could but I think it's appropriate to do this here given that you need to do this when working with the ReplicationTracker in isolation, e.g. PeerRecoveryRetentionLeaseExpiryTests.

if (retentionLeases.get(leaseId) == null) {
/*
* We might have got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
* leases for every shard copy, but in this case we do not expect any leases to exist.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Covering the cases when the index was created in an earlier version is out of scope here.

Did you mean there will be another change here? Why don't we do it now ;). The relocating target should not have a lease if the old primary was on an old version.

.flatMap(n -> StreamSupport.stream(getLeaderCluster().getInstance(IndicesService.class, n).spliterator(), false))
.flatMap(n -> StreamSupport.stream(n.spliterator(), false))
.filter(indexShard -> indexShard.shardId().getIndexName().equals("index1"))
.filter(indexShard -> indexShard.routingEntry().primary())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in tests only but we should make it more robust. See #40386 (comment).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is here to call the temporary advancePeerRecoveryRetentionLeasesToGlobalCheckpoints method, pending implementation of the proper way to advance the leases. Once that happens, it'll be gone. Are you saying that this test sometimes fails? I don't expect the primaries on the leader to move around during this test.

runUnderPrimaryPermit(() -> {
try {
// blindly create the lease. TODO integrate this with the recovery process
shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), startingSeqNo - 1, establishRetentionLeaseStep);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second parameter of addPeerRecoveryRetentionLease is "global checkpoint" which does not match startingSeqNo - 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, but it will be once we only copy operations that are above the GCP :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a clarification in 47b6b42.

@@ -260,7 +261,9 @@ public void testForgetFollower() throws IOException {
final Map<?, ?> shardStatsAsMap = (Map<?, ?>) shardStats.get(0);
final Map<?, ?> retentionLeasesStats = (Map<?, ?>) shardStatsAsMap.get("retention_leases");
final List<?> leases = (List<?>) retentionLeasesStats.get("leases");
assertThat(leases, empty());
for (final Object lease : leases) {
assertThat(((Map<?, ?>) lease).get("source"), equalTo(ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use toMapExcludingPeerRecoveryRetentionLeases here?

@DaveCTurner DaveCTurner requested a review from dnhatn June 18, 2019 11:09
Copy link
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM given this PR will go into a feature branch. Thanks @DaveCTurner.

@DaveCTurner DaveCTurner merged commit 2ec1483 into elastic:peer-recovery-retention-leases Jun 19, 2019
@DaveCTurner DaveCTurner deleted the 2019-06-13-create-peer-recovery-retention-leases branch June 19, 2019 16:39
DaveCTurner added a commit that referenced this pull request Jun 19, 2019
This creates a peer-recovery retention lease for every shard during recovery,
ensuring that the replication group retains history for future peer recoveries.
It also ensures that leases for active shard copies do not expire, and leases
for inactive shard copies expire immediately if the shard is fully-allocated.

Relates #41536
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Indexing/Recovery Anything around constructing a new shard, either from a local or a remote source. >enhancement
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants