Skip to content

Commit

Permalink
SAI-4358 : PRS collection deletion race condition (#103)
Browse files Browse the repository at this point in the history
* Adding Breakpoint injection to CommonTestInjection

* ./gradlew tidy

* Fixed javadoc

* Fixed race condition for PRS collection on deletion

Added unit test case in ZkStateReaderTest#testDeletePrsCollection

* ./gradlew tidy

* Separate the exception handling of NoNodeException and PrsZkNodeNotFoundException as their conditions/handling are quite different.

* Fixed Execution failed for task ':solr:solrj-zookeeper:validateLogCalls'.

* ./gradlew tidy

* Fixed validateLogCalls
  • Loading branch information
patsonluk authored May 24, 2023
1 parent 21f8b09 commit cce48a0
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,19 @@
import org.apache.solr.common.cloud.PerReplicaStatesFetcher;
import org.apache.solr.common.cloud.PerReplicaStatesOps;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.CommonTestInjection;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.common.util.ZLibCompressor;
import org.apache.solr.handler.admin.ConfigSetsHandler;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
Expand Down Expand Up @@ -665,4 +668,126 @@ public void testWatchRaceCondition() throws Exception {
ExecutorUtil.awaitTermination(executorService);
}
}

/**
* Ensure that collection state fetching (getCollectionLive etc.) would not throw exception when
* the state.json is deleted in between the state.json read and PRS entries read
*/
public void testDeletePrsCollection() throws Exception {
ZkStateWriter writer = fixture.writer;
ZkStateReader reader = fixture.reader;

String collectionName = "c1";
fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName, true);

ClusterState clusterState = reader.getClusterState();

String nodeName = "node1:10000_solr";
String sliceName = "shard1";
Slice slice = new Slice(sliceName, Map.of(), Map.of(), collectionName);

// create new collection
DocCollection state =
new DocCollection(
collectionName,
Map.of(sliceName, slice),
Collections.singletonMap(DocCollection.CollectionStateProps.PER_REPLICA_STATE, true),
DocRouter.DEFAULT,
0,
new PerReplicaStatesFetcher.LazyPrsSupplier(
fixture.zkClient, DocCollection.getCollectionPath(collectionName)));
ZkWriteCommand wc = new ZkWriteCommand(collectionName, state);
writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
clusterState = writer.writePendingUpdates();

TimeOut timeOut = new TimeOut(5000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
timeOut.waitFor(
"Timeout on waiting for c1 to show up in cluster state",
() -> reader.getClusterState().getCollectionOrNull(collectionName) != null);

String collectionPath = ZkStateReader.getCollectionPath(collectionName);

// now create the replica, take note that this has to be done after DocCollection creation with
// empty slice,
// otherwise the DocCollection ctor would fetch the PRS entries and throw exceptions
String replicaBaseUrl = Utils.getBaseUrlForNodeName(nodeName, "http");

String replicaName = "replica1";
Replica replica =
new Replica(
replicaName,
Map.of(
ZkStateReader.CORE_NAME_PROP,
"core1",
ZkStateReader.STATE_PROP,
Replica.State.ACTIVE.toString(),
ZkStateReader.NODE_NAME_PROP,
nodeName,
ZkStateReader.BASE_URL_PROP,
replicaBaseUrl,
ZkStateReader.REPLICA_TYPE,
Replica.Type.NRT.name()),
collectionName,
sliceName);

wc =
new ZkWriteCommand(
collectionName, SliceMutator.updateReplica(state, slice, replica.getName(), replica));
writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
clusterState = writer.writePendingUpdates();

timeOut.waitFor(
"Timeout on waiting for replica to show up in cluster state",
() ->
reader.getCollectionLive(collectionName).getSlice(sliceName).getReplica(replicaName)
!= null);

try {
// set breakpoint such that after state.json fetch and before PRS entry fetch, we can delete
// the state.json and
// PRS entries to trigger the race condition
CommonTestInjection.setBreakpoint(
PerReplicaStatesFetcher.class.getName() + "/beforePrsFetch",
(args) -> {
try {
// this is invoked after ZkStateReader.fetchCollectionState has fetched the state.json
// but before PRS entries.
// call delete state.json on ZK directly, very tricky to control execution order with
// writer.enqueueUpdate
reader.getZkClient().clean(collectionPath);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (KeeperException e) {
throw new RuntimeException(e);
}
});

// set breakpoint to verify the expected PrsZkNodeNotFoundException is indeed thrown within
// the execution flow,
// such exception is caught within the logic and not thrown to the caller
AtomicBoolean prsZkNodeNotFoundExceptionThrown = new AtomicBoolean(false);
CommonTestInjection.setBreakpoint(
ZkStateReader.class.getName() + "/exercised",
(args) -> {
if (args[0] instanceof PerReplicaStatesFetcher.PrsZkNodeNotFoundException) {
prsZkNodeNotFoundExceptionThrown.set(true);
}
});

timeOut.waitFor(
"Timeout waiting for collection state to become null",
() -> {
// this should not throw exception even if the PRS entry read is delayed artificially
// (by previous command)
// and deleted after the following getCollectionLive call
return reader.getCollectionLive(collectionName) == null;
});

assertTrue(prsZkNodeNotFoundExceptionThrown.get());
} finally {
// clear breakpoints
CommonTestInjection.setBreakpoint(ZkStateReader.class.getName() + "/beforePrsFetch", null);
CommonTestInjection.setBreakpoint(ZkStateReader.class.getName() + "/exercised", null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.List;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.CommonTestInjection;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
Expand All @@ -35,6 +36,8 @@ public class PerReplicaStatesFetcher {
public static PerReplicaStates fetch(
String path, SolrZkClient zkClient, PerReplicaStates current) {
try {
assert CommonTestInjection.injectBreakpoint(
PerReplicaStatesFetcher.class.getName() + "/beforePrsFetch");
if (current != null) {
Stat stat = zkClient.exists(current.path, null, true);
if (stat == null) return new PerReplicaStates(path, 0, Collections.emptyList());
Expand All @@ -43,6 +46,11 @@ public static PerReplicaStates fetch(
Stat stat = new Stat();
List<String> children = zkClient.getChildren(path, null, stat, true);
return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children));
} catch (KeeperException.NoNodeException e) {
throw new PrsZkNodeNotFoundException(
SolrException.ErrorCode.SERVER_ERROR,
"Error fetching per-replica states. The node [" + path + "] is not found",
e);
} catch (KeeperException e) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e);
Expand All @@ -60,4 +68,10 @@ public LazyPrsSupplier(SolrZkClient zkClient, String collectionPath) {
super(() -> PerReplicaStatesFetcher.fetch(collectionPath, zkClient, null));
}
}

public static class PrsZkNodeNotFoundException extends SolrException {
private PrsZkNodeNotFoundException(ErrorCode code, String msg, Throwable cause) {
super(code, msg, cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1708,6 +1708,20 @@ private DocCollection fetchCollectionState(String coll, Watcher watcher)
}
}
return null;
} catch (PerReplicaStatesFetcher.PrsZkNodeNotFoundException e) {
CommonTestInjection.injectBreakpoint(ZkStateReader.class.getName() + "/exercised", e);
// could be a race condition that state.json and PRS entries are deleted between the
// state.json fetch and PRS entry fetch
Stat exists = zkClient.exists(collectionPath, watcher, true);
if (exists == null) {
log.info(
"PRS entry for collection {} not found in ZK. It was probably deleted between state.json read and PRS entry read.",
coll);

return null;
} else {
throw e; // unexpected, PRS node not found but the collection state.json still exists
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,6 +34,7 @@ public class CommonTestInjection {

private static volatile Map<String, String> additionalSystemProps = null;
private static volatile Integer delay = null;
private static final ConcurrentMap<String, Breakpoint> breakpoints = new ConcurrentHashMap<>();

public static void reset() {
additionalSystemProps = null;
Expand Down Expand Up @@ -73,4 +76,66 @@ public static boolean injectDelay() {
}
return true;
}

/**
* This is usually set by the test cases.
*
* <p>If defined, code execution would break at certain code execution point at the invocation of
* injectBreakpoint with matching key until the provided method in the {@link Breakpoint}
* implementation is executed.
*
* <p>Setting the breakpoint to null would remove the breakpoint
*
* @see CommonTestInjection#injectBreakpoint(String, Object...)
* @param key could simply be the fully qualified class name or more granular like class name +
* other id (such as method name). This should batch the key used in injectBreakpoint
* @param breakpoint The Breakpoint implementation, null to remove the breakpoint
*/
public static void setBreakpoint(String key, Breakpoint breakpoint) {
if (breakpoint != null) {
breakpoints.put(key, breakpoint);
} else {
breakpoints.remove(key);
}
}

/**
* Injects a breakpoint that pauses the existing code execution, executes the code defined in the
* breakpoint implementation and then resumes afterwards. The breakpoint implementation is looked
* up by the corresponding key used in {@link CommonTestInjection#setBreakpoint(String,
* Breakpoint)}
*
* <p>An example usages :
*
* <ol>
* <li>Inject a precise wait until a race condition is fulfilled before proceeding with original
* code execution
* <li>Inject a flag to catch exception statement which handles the exception without
* re-throwing. This could verify caught exception does get triggered
* </ol>
*
* <p>This should always be a part of an assert statement (ie assert injectBreakpoint(key)) such
* that it will be skipped for normal code execution
*
* @see CommonTestInjection#setBreakpoint(String, Breakpoint)
* @param key could simply be the fully qualified class name or more granular like class name +
* other id (such as method name). This should only be set by corresponding unit test cases
* with CommonTestInjection#setBreakpoint
* @param args optional arguments list to be passed to the Breakpoint
*/
public static boolean injectBreakpoint(String key, Object... args) {
Breakpoint breakpoint = breakpoints.get(key);
if (breakpoint != null) {
breakpoint.executeAndResume(args);
}
return true;
}

public interface Breakpoint {
/**
* Code execution should break at where the breakpoint was injected, then it would execute this
* method and resumes the execution afterwards.
*/
void executeAndResume(Object... args);
}
}

0 comments on commit cce48a0

Please sign in to comment.