Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Update PrimaryShardAllocator to prefer replicas with higher replication checkpoint #4041

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardS
NodeGatewayStartedShards::primary
).reversed();

private static final Comparator<NodeGatewayStartedShards> HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing(
NodeGatewayStartedShards::replicationCheckpoint,
Comparator.nullsLast(Comparator.naturalOrder())
);

/**
* Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching
* inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but
Expand Down Expand Up @@ -381,16 +386,24 @@ protected static NodeShardsResult buildNodeShardsResult(
}
}

/**
* Orders the active shards copies based on below comparators
* 1. No store exception i.e. shard copy is readable
* 2. Prefer previous primary shard
* 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices.
*/
final Comparator<NodeGatewayStartedShards> comparator; // allocation preference
if (matchAnyShard) {
// prefer shards with matching allocation ids
Comparator<NodeGatewayStartedShards> matchingAllocationsFirst = Comparator.comparing(
(NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId())
).reversed();
comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR)
.thenComparing(PRIMARY_FIRST_COMPARATOR);
.thenComparing(PRIMARY_FIRST_COMPARATOR)
.thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR);
} else {
comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR);
comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR)
.thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR);
}

nodeShardStates.sort(comparator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.LegacyESVersion;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionType;
import org.opensearch.action.FailedNodeException;
Expand All @@ -56,11 +57,13 @@
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.shard.ShardStateMetadata;
import org.opensearch.index.store.Store;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -195,17 +198,24 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) {
clusterService.localNode(),
allocationId,
shardStateMetadata.primary,
null,
exception
);
}
}

logger.debug("{} shard state info found: [{}]", shardId, shardStateMetadata);
String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null;
return new NodeGatewayStartedShards(clusterService.localNode(), allocationId, shardStateMetadata.primary);
final IndexShard shard = indicesService.getShardOrNull(shardId);
return new NodeGatewayStartedShards(
clusterService.localNode(),
allocationId,
shardStateMetadata.primary,
shard != null ? shard.getLatestReplicationCheckpoint() : null
);
}
logger.trace("{} no local shard info found", shardId);
return new NodeGatewayStartedShards(clusterService.localNode(), null, false);
return new NodeGatewayStartedShards(clusterService.localNode(), null, false, null);
} catch (Exception e) {
throw new OpenSearchException("failed to load started shards", e);
}
Expand Down Expand Up @@ -349,10 +359,10 @@ public String getCustomDataPath() {
* @opensearch.internal
*/
public static class NodeGatewayStartedShards extends BaseNodeResponse {

private final String allocationId;
private final boolean primary;
private final Exception storeException;
private final ReplicationCheckpoint replicationCheckpoint;

public NodeGatewayStartedShards(StreamInput in) throws IOException {
super(in);
Expand All @@ -363,16 +373,33 @@ public NodeGatewayStartedShards(StreamInput in) throws IOException {
} else {
storeException = null;
}
if (in.getVersion().onOrAfter(Version.V_3_0_0) && in.readBoolean()) {
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
replicationCheckpoint = new ReplicationCheckpoint(in);
} else {
replicationCheckpoint = null;
}
}

public NodeGatewayStartedShards(DiscoveryNode node, String allocationId, boolean primary) {
this(node, allocationId, primary, null);
public NodeGatewayStartedShards(
DiscoveryNode node,
String allocationId,
boolean primary,
ReplicationCheckpoint replicationCheckpoint
) {
this(node, allocationId, primary, replicationCheckpoint, null);
}

public NodeGatewayStartedShards(DiscoveryNode node, String allocationId, boolean primary, Exception storeException) {
public NodeGatewayStartedShards(
DiscoveryNode node,
String allocationId,
boolean primary,
ReplicationCheckpoint replicationCheckpoint,
Exception storeException
) {
super(node);
this.allocationId = allocationId;
this.primary = primary;
this.replicationCheckpoint = replicationCheckpoint;
this.storeException = storeException;
}

Expand All @@ -384,6 +411,10 @@ public boolean primary() {
return this.primary;
}

public ReplicationCheckpoint replicationCheckpoint() {
return this.replicationCheckpoint;
}

public Exception storeException() {
return this.storeException;
}
Expand All @@ -399,6 +430,14 @@ public void writeTo(StreamOutput out) throws IOException {
} else {
out.writeBoolean(false);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets make sure we update this to V_2_3_0 when this is backported?

if (replicationCheckpoint != null) {
out.writeBoolean(true);
replicationCheckpoint.writeTo(out);
} else {
out.writeBoolean(false);
}
}
}

@Override
Expand All @@ -414,14 +453,16 @@ public boolean equals(Object o) {

return primary == that.primary
&& Objects.equals(allocationId, that.allocationId)
&& Objects.equals(storeException, that.storeException);
&& Objects.equals(storeException, that.storeException)
&& Objects.equals(replicationCheckpoint, that.replicationCheckpoint);
}

@Override
public int hashCode() {
int result = (allocationId != null ? allocationId.hashCode() : 0);
result = 31 * result + (primary ? 1 : 0);
result = 31 * result + (storeException != null ? storeException.hashCode() : 0);
result = 31 * result + (replicationCheckpoint != null ? replicationCheckpoint.hashCode() : 0);
return result;
}

Expand All @@ -432,6 +473,9 @@ public String toString() {
if (storeException != null) {
buf.append(",storeException=").append(storeException);
}
if (replicationCheckpoint != null) {
buf.append(",ReplicationCheckpoint=").append(replicationCheckpoint.toString());
}
buf.append("]");
return buf.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
*
* @opensearch.internal
*/
public class ReplicationCheckpoint implements Writeable {
public class ReplicationCheckpoint implements Writeable, Comparable<ReplicationCheckpoint> {

private final ShardId shardId;
private final long primaryTerm;
Expand Down Expand Up @@ -107,6 +107,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(segmentInfosVersion);
}

@Override
public int compareTo(ReplicationCheckpoint other) {
return this.isAheadOf(other) ? -1 : 1;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Loading