Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce primary/replica mode for GlobalCheckPointTracker #25468

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ public void readFrom(StreamInput in) throws IOException {
localCheckpoint = in.readZLong();
} else {
// 5.x used to read empty responses, which don't really read anything off the stream, so just do nothing.
localCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
localCheckpoint = SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT;
}
}

Expand Down Expand Up @@ -1202,6 +1202,8 @@ public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
globalCheckpoint = in.readZLong();
} else {
globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.PrimaryContext;
import org.elasticsearch.index.shard.ShardId;

import java.util.Set;
Expand All @@ -41,6 +40,11 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
*/
public static final long NO_OPS_PERFORMED = -1L;

/**
* Represents a local checkpoint coming from a pre-6.0 node
*/
public static final long PRE_60_NODE_LOCAL_CHECKPOINT = -3L;

private final LocalCheckpointTracker localCheckpointTracker;
private final GlobalCheckpointTracker globalCheckpointTracker;

Expand Down Expand Up @@ -126,6 +130,16 @@ public void updateLocalCheckpointForShard(final String allocationId, final long
globalCheckpointTracker.updateLocalCheckpoint(allocationId, checkpoint);
}

/**
* Called when the recovery process for a shard is ready to open the engine on the target shard.
* See {@link GlobalCheckpointTracker#initiateTracking(String)} for details.
*
* @param allocationId the allocation ID of the shard for which recovery was initiated
*/
public void initiateTracking(final String allocationId) {
globalCheckpointTracker.initiateTracking(allocationId);
}

/**
* Marks the shard with the provided allocation ID as in-sync with the primary shard. See
* {@link GlobalCheckpointTracker#markAllocationIdAsInSync(String, long)} for additional details.
Expand Down Expand Up @@ -164,26 +178,45 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint) {
globalCheckpointTracker.updateGlobalCheckpointOnReplica(globalCheckpoint);
}

/**
* Returns the local checkpoint information tracked for a specific shard. Used by tests.
*/
public synchronized long getTrackedLocalCheckpointForShard(final String allocationId) {
return globalCheckpointTracker.getTrackedLocalCheckpointForShard(allocationId).getLocalCheckpoint();
}

/**
* Activates the global checkpoint tracker in primary mode (see {@link GlobalCheckpointTracker#primaryMode}.
* Called on primary activation or promotion.
*/
public void activatePrimaryMode(final String allocationId, final long localCheckpoint) {
globalCheckpointTracker.activatePrimaryMode(allocationId, localCheckpoint);
}

/**
* Notifies the service of the current allocation IDs in the cluster state. See
* {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} for details.
* {@link GlobalCheckpointTracker#updateFromMaster(long, Set, Set, Set)} for details.
*
* @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
* @param activeAllocationIds the allocation IDs of the currently active shard copies
* @param inSyncAllocationIds the allocation IDs of the currently in-sync shard copies
* @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
* @param pre60AllocationIds the allocation IDs of shards that are allocated to pre-6.0 nodes
*/
public void updateAllocationIdsFromMaster(
final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
globalCheckpointTracker.updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds);
final long applyingClusterStateVersion, final Set<String> inSyncAllocationIds, final Set<String> initializingAllocationIds,
final Set<String> pre60AllocationIds) {
globalCheckpointTracker.updateFromMaster(applyingClusterStateVersion, inSyncAllocationIds, initializingAllocationIds,
pre60AllocationIds);
}

/**
* Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source.
* Activates the global checkpoint tracker in primary mode (see {@link GlobalCheckpointTracker#primaryMode}.
* Called on primary relocation target during primary relocation handoff.
*
* @param primaryContext the sequence number context
* @param primaryContext the primary context used to initialize the state
*/
public void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) {
globalCheckpointTracker.updateAllocationIdsFromPrimaryContext(primaryContext);
public void activateWithPrimaryContext(final GlobalCheckpointTracker.PrimaryContext primaryContext) {
globalCheckpointTracker.activateWithPrimaryContext(primaryContext);
}

/**
Expand All @@ -200,15 +233,22 @@ public boolean pendingInSync() {
*
* @return the primary context
*/
public PrimaryContext primaryContext() {
return globalCheckpointTracker.primaryContext();
public GlobalCheckpointTracker.PrimaryContext startRelocationHandoff() {
return globalCheckpointTracker.startRelocationHandoff();
}

/**
* Marks a relocation handoff attempt as successful. Moves the tracker into replica mode.
*/
public void completeRelocationHandoff() {
globalCheckpointTracker.completeRelocationHandoff();
}

/**
* Releases a previously acquired primary context.
* Fails a relocation handoff attempt.
*/
public void releasePrimaryContext() {
globalCheckpointTracker.releasePrimaryContext();
public void abortRelocationHandoff() {
globalCheckpointTracker.abortRelocationHandoff();
}

}
Loading