Skip to content

Commit

Permalink
[Segment Replication] Update PrimaryShardAllocator to prefer replicas…
Browse files Browse the repository at this point in the history
… with higher replication checkpoint (#4041)

* [Segment Replication] Update PrimaryShardAllocator to prefer replicas having higher replication checkpoint

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Use empty replication checkpoint to avoid NPE

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Update NodeGatewayStartedShards to optionally wire in/out ReplicationCheckpoint field

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Use default replication checkpoint causing EOF errors on empty checkpoint

* Add indexSettings to GatewayAllocator to allow ReplicationCheckpoint comparator only for segrep enabled indices

* Add unit tests for primary term first replica promotion & comparator fix

* Fix NPE on empty IndexMetadata

* Remove settings from AllocationService and directly inject in GatewayAllocator

* Add more unit tests and minor code clean up

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Address review comments & integration test

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Fix comparator on null ReplicationCheckpoint

Signed-off-by: Suraj Singh <surajrider@gmail.com>

Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 authored Aug 18, 2022
1 parent 36f1d77 commit d308a29
Show file tree
Hide file tree
Showing 5 changed files with 314 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ private static ShardStoreInfo shardStoreInfo(NodeGatewayStartedShards nodeShardS
NodeGatewayStartedShards::primary
).reversed();

private static final Comparator<NodeGatewayStartedShards> HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR = Comparator.comparing(
NodeGatewayStartedShards::replicationCheckpoint,
Comparator.nullsLast(Comparator.naturalOrder())
);

/**
* Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching
* inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but
Expand Down Expand Up @@ -381,16 +386,24 @@ protected static NodeShardsResult buildNodeShardsResult(
}
}

/**
* Orders the active shards copies based on below comparators
* 1. No store exception i.e. shard copy is readable
* 2. Prefer previous primary shard
* 3. Prefer shard copy with the highest replication checkpoint. It is NO-OP for doc rep enabled indices.
*/
final Comparator<NodeGatewayStartedShards> comparator; // allocation preference
if (matchAnyShard) {
// prefer shards with matching allocation ids
Comparator<NodeGatewayStartedShards> matchingAllocationsFirst = Comparator.comparing(
(NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId())
).reversed();
comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR)
.thenComparing(PRIMARY_FIRST_COMPARATOR);
.thenComparing(PRIMARY_FIRST_COMPARATOR)
.thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR);
} else {
comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR);
comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR)
.thenComparing(HIGHEST_REPLICATION_CHECKPOINT_FIRST_COMPARATOR);
}

nodeShardStates.sort(comparator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.LegacyESVersion;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionType;
import org.opensearch.action.FailedNodeException;
Expand All @@ -56,11 +57,13 @@
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.shard.ShardStateMetadata;
import org.opensearch.index.store.Store;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -195,17 +198,24 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) {
clusterService.localNode(),
allocationId,
shardStateMetadata.primary,
null,
exception
);
}
}

logger.debug("{} shard state info found: [{}]", shardId, shardStateMetadata);
String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null;
return new NodeGatewayStartedShards(clusterService.localNode(), allocationId, shardStateMetadata.primary);
final IndexShard shard = indicesService.getShardOrNull(shardId);
return new NodeGatewayStartedShards(
clusterService.localNode(),
allocationId,
shardStateMetadata.primary,
shard != null ? shard.getLatestReplicationCheckpoint() : null
);
}
logger.trace("{} no local shard info found", shardId);
return new NodeGatewayStartedShards(clusterService.localNode(), null, false);
return new NodeGatewayStartedShards(clusterService.localNode(), null, false, null);
} catch (Exception e) {
throw new OpenSearchException("failed to load started shards", e);
}
Expand Down Expand Up @@ -349,10 +359,10 @@ public String getCustomDataPath() {
* @opensearch.internal
*/
public static class NodeGatewayStartedShards extends BaseNodeResponse {

private final String allocationId;
private final boolean primary;
private final Exception storeException;
private final ReplicationCheckpoint replicationCheckpoint;

public NodeGatewayStartedShards(StreamInput in) throws IOException {
super(in);
Expand All @@ -363,16 +373,33 @@ public NodeGatewayStartedShards(StreamInput in) throws IOException {
} else {
storeException = null;
}
if (in.getVersion().onOrAfter(Version.V_3_0_0) && in.readBoolean()) {
replicationCheckpoint = new ReplicationCheckpoint(in);
} else {
replicationCheckpoint = null;
}
}

public NodeGatewayStartedShards(DiscoveryNode node, String allocationId, boolean primary) {
this(node, allocationId, primary, null);
public NodeGatewayStartedShards(
DiscoveryNode node,
String allocationId,
boolean primary,
ReplicationCheckpoint replicationCheckpoint
) {
this(node, allocationId, primary, replicationCheckpoint, null);
}

public NodeGatewayStartedShards(DiscoveryNode node, String allocationId, boolean primary, Exception storeException) {
public NodeGatewayStartedShards(
DiscoveryNode node,
String allocationId,
boolean primary,
ReplicationCheckpoint replicationCheckpoint,
Exception storeException
) {
super(node);
this.allocationId = allocationId;
this.primary = primary;
this.replicationCheckpoint = replicationCheckpoint;
this.storeException = storeException;
}

Expand All @@ -384,6 +411,10 @@ public boolean primary() {
return this.primary;
}

public ReplicationCheckpoint replicationCheckpoint() {
return this.replicationCheckpoint;
}

public Exception storeException() {
return this.storeException;
}
Expand All @@ -399,6 +430,14 @@ public void writeTo(StreamOutput out) throws IOException {
} else {
out.writeBoolean(false);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
if (replicationCheckpoint != null) {
out.writeBoolean(true);
replicationCheckpoint.writeTo(out);
} else {
out.writeBoolean(false);
}
}
}

@Override
Expand All @@ -414,14 +453,16 @@ public boolean equals(Object o) {

return primary == that.primary
&& Objects.equals(allocationId, that.allocationId)
&& Objects.equals(storeException, that.storeException);
&& Objects.equals(storeException, that.storeException)
&& Objects.equals(replicationCheckpoint, that.replicationCheckpoint);
}

@Override
public int hashCode() {
int result = (allocationId != null ? allocationId.hashCode() : 0);
result = 31 * result + (primary ? 1 : 0);
result = 31 * result + (storeException != null ? storeException.hashCode() : 0);
result = 31 * result + (replicationCheckpoint != null ? replicationCheckpoint.hashCode() : 0);
return result;
}

Expand All @@ -432,6 +473,9 @@ public String toString() {
if (storeException != null) {
buf.append(",storeException=").append(storeException);
}
if (replicationCheckpoint != null) {
buf.append(",ReplicationCheckpoint=").append(replicationCheckpoint.toString());
}
buf.append("]");
return buf.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
*
* @opensearch.internal
*/
public class ReplicationCheckpoint implements Writeable {
public class ReplicationCheckpoint implements Writeable, Comparable<ReplicationCheckpoint> {

private final ShardId shardId;
private final long primaryTerm;
Expand Down Expand Up @@ -107,6 +107,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(segmentInfosVersion);
}

@Override
public int compareTo(ReplicationCheckpoint other) {
return this.isAheadOf(other) ? -1 : 1;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Loading

0 comments on commit d308a29

Please sign in to comment.