4949import org .hamcrest .Matcher ;
5050import org .junit .Before ;
5151
52+ import java .io .IOException ;
53+ import java .io .UncheckedIOException ;
5254import java .util .ArrayList ;
5355import java .util .Arrays ;
5456import java .util .Collections ;
@@ -522,6 +524,7 @@ class Cluster {
522524 final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue (
523525 // TODO does ThreadPool need a node name any more?
524526 Settings .builder ().put (NODE_NAME_SETTING .getKey (), "deterministic-task-queue" ).build (), random ());
527+ private boolean disruptStorage ;
525528 private final VotingConfiguration initialConfiguration ;
526529
527530 private final Set <String > disconnectedNodes = new HashSet <>();
@@ -566,6 +569,7 @@ void runRandomly() {
566569 logger .info ("--> start of safety phase of at least [{}] steps" , randomSteps );
567570
568571 deterministicTaskQueue .setExecutionDelayVariabilityMillis (EXTREME_DELAY_VARIABILITY );
572+ disruptStorage = true ;
569573 int step = 0 ;
570574 long finishTime = -1 ;
571575
@@ -636,7 +640,7 @@ void runRandomly() {
636640 // - reboot a node
637641 // - abdicate leadership
638642
639- } catch (CoordinationStateRejectedException ignored ) {
643+ } catch (CoordinationStateRejectedException | UncheckedIOException ignored ) {
640644 // This is ok: it just means a message couldn't currently be handled.
641645 }
642646
@@ -645,6 +649,7 @@ void runRandomly() {
645649
646650 disconnectedNodes .clear ();
647651 blackholedNodes .clear ();
652+ disruptStorage = false ;
648653 }
649654
650655 private void assertConsistentStates () {
@@ -674,6 +679,7 @@ void stabilise() {
674679 void stabilise (long stabilisationDurationMillis ) {
675680 assertThat ("stabilisation requires default delay variability (and proper cleanup of raised variability)" ,
676681 deterministicTaskQueue .getExecutionDelayVariabilityMillis (), lessThanOrEqualTo (DEFAULT_DELAY_VARIABILITY ));
682+ assertFalse ("stabilisation requires stable storage" , disruptStorage );
677683
678684 if (clusterNodes .stream ().allMatch (n -> n .coordinator .getLastAcceptedState ().getLastAcceptedConfiguration ().isEmpty ())) {
679685 assertThat ("setting initial configuration may fail with disconnected nodes" , disconnectedNodes , empty ());
@@ -826,6 +832,37 @@ ClusterNode getAnyNodePreferringLeaders() {
826832 return getAnyNode ();
827833 }
828834
835+ class MockPersistedState extends InMemoryPersistedState {
836+ MockPersistedState (long term , ClusterState acceptedState ) {
837+ super (term , acceptedState );
838+ }
839+
840+ private void possiblyFail (String description ) {
841+ if (disruptStorage && rarely ()) {
842+ // TODO revisit this when we've decided how PersistedState should throw exceptions
843+ if (randomBoolean ()) {
844+ throw new UncheckedIOException (new IOException ("simulated IO exception [" + description + ']' ));
845+ } else {
846+ throw new CoordinationStateRejectedException ("simulated IO exception [" + description + ']' );
847+ }
848+ }
849+ }
850+
851+ @ Override
852+ public void setCurrentTerm (long currentTerm ) {
853+ possiblyFail ("before writing term of " + currentTerm );
854+ super .setCurrentTerm (currentTerm );
855+ // TODO possiblyFail() here if that's a failure mode of the storage layer
856+ }
857+
858+ @ Override
859+ public void setLastAcceptedState (ClusterState clusterState ) {
860+ possiblyFail ("before writing last-accepted state of term=" + clusterState .term () + ", version=" + clusterState .version ());
861+ super .setLastAcceptedState (clusterState );
862+ // TODO possiblyFail() here if that's a failure mode of the storage layer
863+ }
864+ }
865+
829866 class ClusterNode extends AbstractComponent {
830867 private final int nodeIndex ;
831868 private Coordinator coordinator ;
@@ -841,7 +878,7 @@ class ClusterNode extends AbstractComponent {
841878 super (Settings .builder ().put (NODE_NAME_SETTING .getKey (), nodeIdFromIndex (nodeIndex )).build ());
842879 this .nodeIndex = nodeIndex ;
843880 localNode = createDiscoveryNode ();
844- persistedState = new InMemoryPersistedState (0L ,
881+ persistedState = new MockPersistedState (0L ,
845882 clusterState (0L , 0L , localNode , VotingConfiguration .EMPTY_CONFIG , VotingConfiguration .EMPTY_CONFIG , 0L ));
846883 onNode (localNode , this ::setUp ).run ();
847884 }
0 commit comments