Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAI-4358 : PRS collection deletion race condition #103

Merged
merged 9 commits into from
May 24, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,19 @@
import org.apache.solr.common.cloud.PerReplicaStatesFetcher;
import org.apache.solr.common.cloud.PerReplicaStatesOps;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.CommonTestInjection;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.common.util.ZLibCompressor;
import org.apache.solr.handler.admin.ConfigSetsHandler;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
Expand Down Expand Up @@ -665,4 +668,126 @@ public void testWatchRaceCondition() throws Exception {
ExecutorUtil.awaitTermination(executorService);
}
}

/**
* Ensure that collection state fetching (getCollectionLive etc.) would not throw exception when
* the state.json is deleted in between the state.json read and PRS entries read
*/
public void testDeletePrsCollection() throws Exception {
ZkStateWriter writer = fixture.writer;
ZkStateReader reader = fixture.reader;

String collectionName = "c1";
fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName, true);

ClusterState clusterState = reader.getClusterState();

String nodeName = "node1:10000_solr";
String sliceName = "shard1";
Slice slice = new Slice(sliceName, Map.of(), Map.of(), collectionName);

// create new collection
DocCollection state =
new DocCollection(
collectionName,
Map.of(sliceName, slice),
Collections.singletonMap(DocCollection.CollectionStateProps.PER_REPLICA_STATE, true),
DocRouter.DEFAULT,
0,
new PerReplicaStatesFetcher.LazyPrsSupplier(
fixture.zkClient, DocCollection.getCollectionPath(collectionName)));
ZkWriteCommand wc = new ZkWriteCommand(collectionName, state);
writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
clusterState = writer.writePendingUpdates();

TimeOut timeOut = new TimeOut(5000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
timeOut.waitFor(
"Timeout on waiting for c1 to show up in cluster state",
() -> reader.getClusterState().getCollectionOrNull(collectionName) != null);

String collectionPath = ZkStateReader.getCollectionPath(collectionName);

// now create the replica, take note that this has to be done after DocCollection creation with
// empty slice,
// otherwise the DocCollection ctor would fetch the PRS entries and throw exceptions
String replicaBaseUrl = Utils.getBaseUrlForNodeName(nodeName, "http");

String replicaName = "replica1";
Replica replica =
new Replica(
replicaName,
Map.of(
ZkStateReader.CORE_NAME_PROP,
"core1",
ZkStateReader.STATE_PROP,
Replica.State.ACTIVE.toString(),
ZkStateReader.NODE_NAME_PROP,
nodeName,
ZkStateReader.BASE_URL_PROP,
replicaBaseUrl,
ZkStateReader.REPLICA_TYPE,
Replica.Type.NRT.name()),
collectionName,
sliceName);

wc =
new ZkWriteCommand(
collectionName, SliceMutator.updateReplica(state, slice, replica.getName(), replica));
writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
clusterState = writer.writePendingUpdates();

timeOut.waitFor(
"Timeout on waiting for replica to show up in cluster state",
() ->
reader.getCollectionLive(collectionName).getSlice(sliceName).getReplica(replicaName)
!= null);

try {
// set breakpoint such that after state.json fetch and before PRS entry fetch, we can delete
// the state.json and
// PRS entries to trigger the race condition
CommonTestInjection.setBreakpoint(
PerReplicaStatesFetcher.class.getName() + "/beforePrsFetch",
(args) -> {
try {
// this is invoked after ZkStateReader.fetchCollectionState has fetched the state.json
// but before PRS entries.
// call delete state.json on ZK directly, very tricky to control execution order with
// writer.enqueueUpdate
reader.getZkClient().clean(collectionPath);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (KeeperException e) {
throw new RuntimeException(e);
}
});

// set breakpoint to verify the expected PrsZkNodeNotFoundException is indeed thrown within
// the execution flow,
// such exception is caught within the logic and not thrown to the caller
AtomicBoolean prsZkNodeNotFoundExceptionThrown = new AtomicBoolean(false);
CommonTestInjection.setBreakpoint(
ZkStateReader.class.getName() + "/exercised",
(args) -> {
if (args[0] instanceof PerReplicaStatesFetcher.PrsZkNodeNotFoundException) {
prsZkNodeNotFoundExceptionThrown.set(true);
}
});

timeOut.waitFor(
"Timeout waiting for collection state to become null",
() -> {
// this should not throw exception even if the PRS entry read is delayed artificially
// (by previous command)
// and deleted after the following getCollectionLive call
return reader.getCollectionLive(collectionName) == null;
});

assertTrue(prsZkNodeNotFoundExceptionThrown.get());
} finally {
// clear breakpoints
CommonTestInjection.setBreakpoint(ZkStateReader.class.getName() + "/beforePrsFetch", null);
CommonTestInjection.setBreakpoint(ZkStateReader.class.getName() + "/exercised", null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.List;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.CommonTestInjection;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
Expand All @@ -35,6 +36,8 @@ public class PerReplicaStatesFetcher {
public static PerReplicaStates fetch(
String path, SolrZkClient zkClient, PerReplicaStates current) {
try {
assert CommonTestInjection.injectBreakpoint(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to assert here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so that for normal execution (non unit test), this call will be ignored

PerReplicaStatesFetcher.class.getName() + "/beforePrsFetch");
if (current != null) {
Stat stat = zkClient.exists(current.path, null, true);
if (stat == null) return new PerReplicaStates(path, 0, Collections.emptyList());
Expand All @@ -43,6 +46,11 @@ public static PerReplicaStates fetch(
Stat stat = new Stat();
List<String> children = zkClient.getChildren(path, null, stat, true);
return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children));
} catch (KeeperException.NoNodeException e) {
throw new PrsZkNodeNotFoundException(
SolrException.ErrorCode.SERVER_ERROR,
"Error fetching per-replica states. The node [" + path + "] is not found",
e);
} catch (KeeperException e) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e);
Expand All @@ -60,4 +68,10 @@ public LazyPrsSupplier(SolrZkClient zkClient, String collectionPath) {
super(() -> PerReplicaStatesFetcher.fetch(collectionPath, zkClient, null));
}
}

public static class PrsZkNodeNotFoundException extends SolrException {
private PrsZkNodeNotFoundException(ErrorCode code, String msg, Throwable cause) {
super(code, msg, cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1708,6 +1708,20 @@ private DocCollection fetchCollectionState(String coll, Watcher watcher)
}
}
return null;
} catch (PerReplicaStatesFetcher.PrsZkNodeNotFoundException e) {
CommonTestInjection.injectBreakpoint(ZkStateReader.class.getName() + "/exercised", e);
// could be a race condition that state.json and PRS entries are deleted between the
// state.json fetch and PRS entry fetch
Stat exists = zkClient.exists(collectionPath, watcher, true);
if (exists == null) {
log.info(
"PRS entry for collection {} not found in ZK. It was probably deleted between state.json read and PRS entry read.",
coll);

return null;
} else {
throw e; // unexpected, PRS node not found but the collection state.json still exists
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,6 +34,7 @@ public class CommonTestInjection {

private static volatile Map<String, String> additionalSystemProps = null;
private static volatile Integer delay = null;
private static final ConcurrentMap<String, Breakpoint> breakpoints = new ConcurrentHashMap<>();

public static void reset() {
additionalSystemProps = null;
Expand Down Expand Up @@ -73,4 +76,66 @@ public static boolean injectDelay() {
}
return true;
}

/**
* This is usually set by the test cases.
*
* <p>If defined, code execution would break at certain code execution point at the invocation of
* injectBreakpoint with matching key until the provided method in the {@link Breakpoint}
* implementation is executed.
*
* <p>Setting the breakpoint to null would remove the breakpoint
*
* @see CommonTestInjection#injectBreakpoint(String, Object...)
* @param key could simply be the fully qualified class name or more granular like class name +
* other id (such as method name). This should batch the key used in injectBreakpoint
* @param breakpoint The Breakpoint implementation, null to remove the breakpoint
*/
public static void setBreakpoint(String key, Breakpoint breakpoint) {
if (breakpoint != null) {
breakpoints.put(key, breakpoint);
} else {
breakpoints.remove(key);
}
}

/**
* Injects a breakpoint that pauses the existing code execution, executes the code defined in the
* breakpoint implementation and then resumes afterwards. The breakpoint implementation is looked
* up by the corresponding key used in {@link CommonTestInjection#setBreakpoint(String,
* Breakpoint)}
*
* <p>An example usages :
*
* <ol>
* <li>Inject a precise wait until a race condition is fulfilled before proceeding with original
* code execution
* <li>Inject a flag to catch exception statement which handles the exception without
* re-throwing. This could verify caught exception does get triggered
* </ol>
*
* <p>This should always be a part of an assert statement (ie assert injectBreakpoint(key)) such
* that it will be skipped for normal code execution
*
* @see CommonTestInjection#setBreakpoint(String, Breakpoint)
* @param key could simply be the fully qualified class name or more granular like class name +
* other id (such as method name). This should only be set by corresponding unit test cases
* with CommonTestInjection#setBreakpoint
* @param args optional arguments list to be passed to the Breakpoint
*/
public static boolean injectBreakpoint(String key, Object... args) {
Breakpoint breakpoint = breakpoints.get(key);
if (breakpoint != null) {
breakpoint.executeAndResume(args);
}
return true;
}

public interface Breakpoint {
/**
* Code execution should break at where the breakpoint was injected, then it would execute this
* method and resumes the execution afterwards.
*/
void executeAndResume(Object... args);
}
}