Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-16995: Use leaderEligible property where it makes sense #1981

Merged
merged 2 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ private final void replicate(String nodeName, SolrCore core, ZkNodeProps leaderp
log.info("Attempting to replicate from [{}].", leaderUrl);

// send commit if replica could be a leader
if (Replica.Type.isLeaderType(replicaType)) {
if (replicaType.leaderEligible) {
commitOnLeader(leaderUrl);
}

Expand Down Expand Up @@ -365,7 +365,7 @@ private final void doReplicateOnlyRecovery(SolrCore core) throws InterruptedExce
throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
}
if (cloudDesc.isLeader()) {
assert cloudDesc.getReplicaType() != Replica.Type.PULL;
assert cloudDesc.getReplicaType().leaderEligible;
// we are now the leader - no one else must have been suitable
log.warn("We have not yet recovered - but we are now the leader!");
log.info("Finished recovery process.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package org.apache.solr.cloud;

import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.MapWriter;
Expand Down Expand Up @@ -50,6 +52,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
private final CoreContainer cc;
private final SyncStrategy syncStrategy;
private final DistributedClusterStateUpdater distributedClusterStateUpdater;
private final EnumSet<Replica.Type> leaderEligibleReplicaTypes;

private volatile boolean isClosed = false;

Expand All @@ -65,6 +68,10 @@ public ShardLeaderElectionContext(
this.cc = cc;
this.syncStrategy = new SyncStrategy(cc);
this.distributedClusterStateUpdater = zkController.getDistributedClusterStateUpdater();
leaderEligibleReplicaTypes =
Arrays.stream(Replica.Type.values())
.filter(t -> t.leaderEligible)
.collect(Collectors.toCollection(() -> EnumSet.noneOf(Replica.Type.class)));
}

@Override
Expand Down Expand Up @@ -456,16 +463,16 @@ private boolean waitForReplicasToComeUp(int timeoutms) throws InterruptedExcepti
}

// on startup and after connection timeout, wait for all known shards
if (found >= slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size()) {
if (found >= slices.getReplicas(leaderEligibleReplicaTypes).size()) {
log.info("Enough replicas found to continue.");
return true;
} else {
if (cnt % 40 == 0) {
if (log.isInfoEnabled()) {
log.info(
"Waiting until we see more replicas up for shard {}: total={} found={} timeoute in={}ms",
"Waiting until we see more replicas up for shard {}: total={} found={} timeout in={}ms",
shardId,
slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size(),
slices.getReplicas(leaderEligibleReplicaTypes).size(),
found,
TimeUnit.MILLISECONDS.convert(
timeoutAt - System.nanoTime(), TimeUnit.NANOSECONDS));
Expand Down
18 changes: 10 additions & 8 deletions solr/core/src/java/org/apache/solr/cloud/ZkController.java
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ public void giveupLeadership(CoreDescriptor cd) {
.getReplicas(
rep ->
rep.getState() == Replica.State.ACTIVE
&& rep.getType() != Type.PULL
&& rep.getType().leaderEligible
&& liveNodes.contains(rep.getNodeName()))
.size();

Expand Down Expand Up @@ -1284,7 +1284,7 @@ public String register(
"Error registering SolrCore, replica is removed from clusterstate");
}

if (replica.getType() != Type.PULL) {
if (replica.getType().leaderEligible) {
getCollectionTerms(collection).register(cloudDesc.getShardId(), coreZkNodeName);
}

Expand All @@ -1300,7 +1300,7 @@ public String register(
try {
// If we're a preferred leader, insert ourselves at the head of the queue
boolean joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
if (replica.getType() != Type.PULL) {
if (replica.getType().leaderEligible) {
joinElection(desc, afterExpiration, joinAtHead);
} else if (replica.getType() == Type.PULL) {
if (joinAtHead) {
Expand Down Expand Up @@ -1329,7 +1329,8 @@ public String register(
String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
log.debug("We are {} and leader is {}", ourUrl, leaderUrl);
boolean isLeader = leaderUrl.equals(ourUrl);
assert !(isLeader && replica.getType() == Type.PULL) : "Pull replica became leader!";
assert !isLeader || replica.getType().leaderEligible
: replica.getType().name() + " replica became leader!";

try (SolrCore core = cc.getCore(desc.getName())) {

Expand Down Expand Up @@ -1391,7 +1392,7 @@ public String register(
publish(desc, Replica.State.ACTIVE);
}

if (replica.getType() != Type.PULL) {
if (replica.getType().leaderEligible) {
// the watcher is added to a set so multiple calls of this method will left only one
// watcher
shardTerms.addListener(
Expand Down Expand Up @@ -1729,13 +1730,14 @@ public void publish(

// pull replicas are excluded because their terms are not considered
if (state == Replica.State.RECOVERING
&& cd.getCloudDescriptor().getReplicaType() != Type.PULL) {
&& cd.getCloudDescriptor().getReplicaType().leaderEligible) {
// state is used by client, state of replica can change from RECOVERING to DOWN without
// needed to finish recovery by calling this we will know that a replica actually finished
// recovery or not
getShardTerms(collection, shardId).startRecovering(coreNodeName);
}
if (state == Replica.State.ACTIVE && cd.getCloudDescriptor().getReplicaType() != Type.PULL) {
if (state == Replica.State.ACTIVE
&& cd.getCloudDescriptor().getReplicaType().leaderEligible) {
getShardTerms(collection, shardId).doneRecovering(coreNodeName);
}

Expand Down Expand Up @@ -1824,7 +1826,7 @@ public void unregister(String coreName, CoreDescriptor cd, boolean removeCoreFro
zkStateReader.getClusterState().getCollectionOrNull(collection);
Replica replica = (docCollection == null) ? null : docCollection.getReplica(coreNodeName);

if (replica == null || replica.getType() != Type.PULL) {
if (replica == null || replica.getType().leaderEligible) {
ElectionContext context = electionContexts.remove(new ContextKey(collection, coreNodeName));

if (context != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ static List<String> verifyReplicaAvailability(List<Replica> sourceReplicas, Clus
// can't delete the only replica in existence
res.add(coll + "/" + shard + "/" + replicaName + ", type=" + sourceReplica.getType());
} else { // check replica types
int otherNonPullReplicas = 0;
int otherLeaderEligibleReplicas = 0;
for (Replica r : slice.getReplicas()) {
if (!r.getName().equals(replicaName) && !r.getType().equals(Replica.Type.PULL)) {
otherNonPullReplicas++;
if (!r.getName().equals(replicaName) && r.getType().leaderEligible) {
otherLeaderEligibleReplicas++;
}
}
// can't delete - there are no other non-pull replicas
if (otherNonPullReplicas == 0) {
// can't delete - there are no other replicas that can be leader
if (otherLeaderEligibleReplicas == 0) {
res.add(coll + "/" + shard + "/" + replicaName + ", type=" + sourceReplica.getType());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionSta
// as long as it's active and can become a leader, in which case we don't have to wait
// for recovery of specifically the one that we've just added
if (!replica.getName().equals(replicaId)) {
if (replica.getType().equals(Replica.Type.PULL)) { // not eligible for leader election
if (!replica.getType().leaderEligible) {
continue;
}
// check its state
Expand Down
10 changes: 0 additions & 10 deletions solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,6 @@ public enum Type {
public static Type get(String name) {
return name == null ? Type.NRT : Type.valueOf(name.toUpperCase(Locale.ROOT));
}

/**
* Only certain replica types can become leaders
*
* @param type the type of a replica
* @return true if that type is able to be leader, false otherwise
*/
public static boolean isLeaderType(Type type) {
return type == null || type == NRT || type == TLOG;
}
}

// immutable
Expand Down
5 changes: 2 additions & 3 deletions solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.solr.common.cloud.Replica.Type;
import org.apache.solr.common.util.CollectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -241,8 +240,8 @@ private Map<String, Replica> makeReplicas(
private Replica findLeader() {
for (Replica replica : replicas.values()) {
if (replica.isLeader()) {
assert replica.getType() == Type.TLOG || replica.getType() == Type.NRT
: "Pull replica should not become leader!";
assert replica.getType().leaderEligible
: replica.getType().toString() + " replica should not become leader!";
return replica;
}
}
Expand Down
Loading