Skip to content
This repository was archived by the owner on Nov 12, 2024. It is now read-only.

Commit b36942f

Browse files
authored
Merge pull request #84 from dmitraver/tests_fixing
Fixes broken tests
2 parents e3ea9f2 + befa865 commit b36942f

19 files changed

+474
-307
lines changed

src/main/scala/pl/project13/scala/akka/raft/Candidate.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ private[raft] trait Candidate {
1818
stay()
1919

2020
// election
21-
case Event(BeginElection, m: Meta) =>
21+
case Event(msg @ BeginElection, m: Meta) =>
22+
if(raftConfig.publishTestingEvents) context.system.eventStream.publish(ElectionStarted(m.currentTerm, self))
23+
2224
if (m.config.members.isEmpty) {
2325
log.warning("Tried to initialize election with no members...")
2426
goto(Follower) applying GoToFollowerEvent()

src/main/scala/pl/project13/scala/akka/raft/Follower.scala

+6-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ private[raft] trait Follower {
1313
protected def raftConfig: RaftConfig
1414

1515
val followerBehavior: StateFunction = {
16+
17+
case Event(msg @ BeginAsFollower(term, _), m : Meta) =>
18+
if (raftConfig.publishTestingEvents) context.system.eventStream.publish(msg)
19+
stay()
20+
1621
case Event(msg: ClientMessage[Command], m: Meta) =>
1722
log.info("Follower got {} from client; Respond with last Leader that took write from: {}", msg, recentlyContactedByLeader)
1823
sender() ! LeaderIs(recentlyContactedByLeader, Some(msg))
@@ -133,7 +138,7 @@ private[raft] trait Follower {
133138

134139
case (newConfig: ClusterConfiguration) :: moreEntries if newConfig.isNewerThan(config) =>
135140
log.info("Appended new configuration (seq: {}), will start using it now: {}", newConfig.sequenceNumber, newConfig)
136-
maybeGetNewConfiguration(moreEntries, config)
141+
maybeGetNewConfiguration(moreEntries, newConfig)
137142

138143
case _ :: moreEntries =>
139144
maybeGetNewConfiguration(moreEntries, config)

src/main/scala/pl/project13/scala/akka/raft/Leader.scala

+6-4
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ private[raft] trait Leader {
1414
private val HeartbeatTimerName = "heartbeat-timer"
1515

1616
val leaderBehavior: StateFunction = {
17-
case Event(ElectedAsLeader, m: Meta) =>
17+
case Event(msg @ BeginAsLeader(term, _), m: Meta) =>
18+
if (raftConfig.publishTestingEvents) context.system.eventStream.publish(msg)
1819
log.info("Became leader for {}", m.currentTerm)
1920
initializeLeaderState(m.config.members)
2021
startHeartbeat(m)
@@ -42,10 +43,11 @@ private[raft] trait Leader {
4243
val meta = maybeUpdateConfiguration(m, entry.command)
4344
replicateLog(meta)
4445

45-
if (meta.config.isPartOfNewConfiguration(m.clusterSelf))
46-
stay() applying KeepStateEvent()
47-
else
46+
if (meta.config.isPartOfNewConfiguration(m.clusterSelf)) {
47+
stay() applying WithNewConfigEvent(config = meta.config)
48+
} else {
4849
goto(Follower) applying GoToFollowerEvent()
50+
}
4951

5052
// rogue Leader handling
5153
case Event(append: AppendEntries[Command], m: Meta) if append.term > m.currentTerm =>

src/main/scala/pl/project13/scala/akka/raft/RaftActor.scala

+11-3
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,15 @@ abstract class RaftActor extends Actor with PersistentFSM[RaftState, Meta, Domai
4646
val heartbeatInterval: FiniteDuration = raftConfig.heartbeatInterval
4747

4848

49-
override implicit def domainEventClassTag: ClassTag[DomainEvent] = classTag[DomainEvent]
49+
override def domainEventClassTag: ClassTag[DomainEvent] = classTag[DomainEvent]
5050

5151
override def persistenceId = "RaftActor-" + self.path.name
5252

5353
override def applyEvent(domainEvent: DomainEvent, data: Meta): Meta = domainEvent match {
54-
case GoToFollowerEvent(term) => term.fold(data.forFollower())(t => data.forFollower(t))
54+
case GoToFollowerEvent(term) => term.fold(data.forFollower()) {t =>
55+
if (raftConfig.publishTestingEvents) context.system.eventStream.publish(TermUpdated(t, self))
56+
data.forFollower(t)
57+
}
5558
case GoToLeaderEvent() => data.forLeader
5659
case StartElectionEvent() => data.forNewElection
5760
case KeepStateEvent() => data
@@ -60,6 +63,9 @@ abstract class RaftActor extends Actor with PersistentFSM[RaftState, Meta, Domai
6063
case VoteForSelfEvent() => data.incVote.withVoteFor(data.clusterSelf)
6164
case WithNewConfigEvent(term, config) => term.fold(data.withConfig(config))(t => data.withConfig(config).withTerm(t))
6265
case WithNewClusterSelf(s) => data.copy(clusterSelf = s)
66+
case UpdateTermEvent(term) =>
67+
if (raftConfig.publishTestingEvents) context.system.eventStream.publish(TermUpdated(term, self))
68+
data.withTerm(term)
6369
}
6470

6571
def nextElectionDeadline(): Deadline = randomElectionTimeout(
@@ -83,6 +89,7 @@ abstract class RaftActor extends Actor with PersistentFSM[RaftState, Meta, Domai
8389

8490
onTransition {
8591
case Init -> Follower if stateData.clusterSelf != self =>
92+
self ! BeginAsFollower(stateData.currentTerm, self)
8693
log.info("Cluster self != self => Running clustered via a proxy.")
8794
resetElectionDeadline()
8895

@@ -91,10 +98,11 @@ abstract class RaftActor extends Actor with PersistentFSM[RaftState, Meta, Domai
9198
resetElectionDeadline()
9299

93100
case Candidate -> Leader =>
94-
self ! ElectedAsLeader
101+
self ! BeginAsLeader(stateData.currentTerm, self)
95102
cancelElectionDeadline()
96103

97104
case _ -> Follower =>
105+
self ! BeginAsFollower(stateData.currentTerm, self)
98106
resetElectionDeadline()
99107
}
100108

src/main/scala/pl/project13/scala/akka/raft/protocol/InternalProtocol.scala

+5-1
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@ private[protocol] trait InternalProtocol extends Serializable {
1111
sealed trait ElectionMessage extends Message[Internal]
1212
sealed trait LeaderMessage extends Message[Internal]
1313

14+
case class BeginAsFollower(term: Term, ref: ActorRef) extends InternalMessage
15+
1416
case object BeginElection extends ElectionMessage
1517
case class VoteCandidate(term: Term) extends ElectionMessage
1618
case class DeclineCandidate(term: Term) extends ElectionMessage
1719

18-
case object ElectedAsLeader extends ElectionMessage
20+
case class BeginAsLeader(term: Term, ref: ActorRef) extends ElectionMessage
1921
case object ElectionTimeout extends ElectionMessage
2022

2123
/** When the Leader has sent an append, for an unexpected number, the Follower replies with this */
@@ -38,5 +40,7 @@ private[protocol] trait InternalProtocol extends Serializable {
3840
// ---- testing and monitoring messages ----
3941
case class EntryCommitted(idx: Int, on: ActorRef) extends Message[Testing]
4042
case class SnapshotWritten(initialSize: Int, compactedSize: Int) extends Message[Testing]
43+
case class TermUpdated(term: Term, on: ActorRef) extends Message[Testing]
44+
case class ElectionStarted(term: Term, on: ActorRef) extends Message[Testing]
4145
// ---- end of testing and monitoring messages ----
4246
}

src/test/scala/akka/fsm/hack/TestFSMRefHack.scala

-16
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,32 @@
11
package pl.project13.scala.akka.raft
22

3-
import pl.project13.scala.akka.raft.protocol._
4-
import akka.testkit.{ImplicitSender, TestFSMRef}
3+
import akka.testkit.ImplicitSender
54
import org.scalatest.BeforeAndAfterEach
65
import org.scalatest.concurrent.Eventually
7-
import scala.concurrent.duration._
86
import org.scalatest.time.{Millis, Span}
9-
import pl.project13.scala.akka.raft.model.{Entry, Term}
107
import pl.project13.scala.akka.raft.example.protocol._
8+
import pl.project13.scala.akka.raft.model.Entry
9+
import pl.project13.scala.akka.raft.protocol._
1110

1211
class CandidateTest extends RaftSpec with BeforeAndAfterEach
1312
with Eventually
14-
with ImplicitSender {
13+
with ImplicitSender with PersistenceCleanup {
1514

1615
behavior of "Candidate"
1716

18-
val candidate = TestFSMRef(new SnapshottingWordConcatRaftActor with EventStreamAllMessages)
19-
20-
var data: Meta = _
21-
22-
val initialMembers = 0
23-
24-
override def beforeEach() {
25-
super.beforeEach()
26-
data = Meta.initial(candidate)
27-
.copy(
28-
currentTerm = Term(2),
29-
config = ClusterConfiguration(self)
30-
).forNewElection
31-
}
17+
val initialMembers = 3
3218

33-
it should "start a new election round if electionTimeout reached, and no one became Leader" in {
19+
// This test is commented because there is no easy way to prevent leader election
20+
/*it should "start a new election round if electionTimeout reached, and no one became Leader" in {
3421
// given
22+
3523
subscribeBeginElection()
3624
37-
candidate.setState(Candidate, data)
38-
candidate.underlyingActor.resetElectionDeadline()
25+
info("Waiting for election to start...")
26+
val msg = awaitElectionStarted()
27+
val candidate = msg.on
3928
4029
// when
41-
awaitBeginElection()
42-
4330
Thread.sleep(electionTimeoutMin.toMillis)
4431
Thread.sleep(electionTimeoutMin.toMillis)
4532
@@ -48,48 +35,75 @@ class CandidateTest extends RaftSpec with BeforeAndAfterEach
4835
eventually {
4936
candidate.stateName should equal (Candidate)
5037
}
51-
}
38+
}*/
5239

5340
it should "go back to Follower state if got message from elected Leader (from later Term)" in {
5441
// given
5542
subscribeBeginElection()
5643

5744
implicit val patienceConfig = PatienceConfig(timeout = scaled(Span(300, Millis)), interval = scaled(Span(50, Millis)))
5845

59-
val entry = Entry(AppendWord("x"), Term(3), 5)
60-
candidate.setState(Candidate, data)
46+
info("Waiting for election to start...")
47+
val msg = awaitElectionStarted()
48+
val candidate = msg.on
49+
val term = msg.term
50+
val nextTerm = term.next
51+
info(s"Member $candidate become a Candidate in $term")
52+
53+
val entry = Entry(AppendWord("x"), nextTerm, 5)
6154

6255
// when
63-
candidate ! AppendEntries(Term(3), Term(2), 6, entry :: Nil, 5)
56+
candidate ! AppendEntries(nextTerm, term, 6, entry :: Nil, 5)
6457

6558
// then
6659
eventually {
6760
// should have reverted to Follower
68-
candidate.stateName === Follower
69-
70-
// and applied the message in Follower
71-
candidate.underlyingActor.replicatedLog.entries contains entry
61+
followers should contain(candidate)
7262
}
7363
}
7464

7565
it should "reject candidate if got RequestVote message with a stale term number" in {
76-
candidate.setState(Candidate, data)
66+
restartMember(leaders.headOption)
67+
subscribeBeginElection()
68+
69+
info("Waiting for election to start...")
70+
val msg = awaitElectionStarted()
71+
val candidate = msg.on
72+
val term = msg.term
73+
val prevTerm = term.prev
74+
info(s"Member $candidate become a Candidate in $term")
75+
76+
info(s"Requesting vote from member with a stale term $prevTerm...")
77+
candidate ! RequestVote(prevTerm, self, prevTerm, 1)
7778

78-
candidate ! RequestVote(Term(1), self, Term(1), 1)
79-
fishForMessage(max = 5 seconds) {
80-
case DeclineCandidate(Term(3)) => true
79+
fishForMessage() {
80+
case DeclineCandidate(msgTerm) if msgTerm == term => true
8181
case _ => false
8282
}
8383
}
8484

85+
8586
it should "reject candidate if got VoteCandidate message with a stale term number" in {
86-
candidate.setState(Candidate, data)
87+
restartMember(leaders.headOption)
88+
subscribeBeginElection()
89+
90+
info("Waiting for election to start...")
91+
val msg = awaitElectionStarted()
92+
val candidate = msg.on
93+
val term = msg.term
94+
val prevTerm = term.prev
95+
info(s"Member $candidate become a Candidate in $term")
96+
97+
info(s"Voting for candidate from member with a stale term $prevTerm...")
98+
candidate ! VoteCandidate(prevTerm)
8799

88-
candidate ! VoteCandidate(Term(1))
89-
fishForMessage(max = 5 seconds) {
90-
case DeclineCandidate(Term(3)) => true
100+
fishForMessage() {
101+
case DeclineCandidate(msgTerm) if msgTerm == term => true
91102
case _ => false
92103
}
93104
}
94105

106+
override def beforeAll(): Unit =
107+
subscribeClusterStateTransitions()
108+
super.beforeAll()
95109
}

src/test/scala/pl/project13/scala/akka/raft/ClusterMembershipChangeTest.scala

+17-13
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package pl.project13.scala.akka.raft
22

33
import pl.project13.scala.akka.raft.protocol._
44

5-
class ClusterMembershipChangeTest extends RaftSpec(callingThreadDispatcher = false) {
5+
class ClusterMembershipChangeTest extends RaftSpec {
66

77
behavior of "Cluster membership change"
88

@@ -13,14 +13,15 @@ class ClusterMembershipChangeTest extends RaftSpec(callingThreadDispatcher = fal
1313

1414
it should "allow to add additional servers" in {
1515
// given
16-
subscribeElectedLeader()
17-
subscribeEntryComitted()
16+
subscribeBeginAsLeader()
17+
val msg = awaitBeginAsLeader()
18+
val initialLeader = msg.ref
1819

19-
awaitElectedLeader()
20+
subscribeEntryComitted()
2021

2122
info("Initial state: ")
2223
infoMemberStates()
23-
val initialLeader = leader()
24+
//val initialLeader = leaders.head
2425

2526
// when
2627
val additionalActor = createActor(s"raft-member-${initialMembers + 1}")
@@ -32,24 +33,27 @@ class ClusterMembershipChangeTest extends RaftSpec(callingThreadDispatcher = fal
3233
// but it's also interesting to see in the logs, if propagation goes on properly, no specific test there
3334
awaitEntryComitted(1)
3435

36+
system.eventStream.unsubscribe(probe.ref)
3537
// then
3638
infoMemberStates()
37-
info("leader : " + leaders().map(simpleName))
38-
info("candidate: " + candidates().map(simpleName))
39-
info("follower : " + followers().map(simpleName))
39+
info("leader : " + leaders.map(_.path.name))
40+
info("candidate: " + candidates.map(_.path.name))
41+
info("follower : " + followers.map(_.path.name))
4042
info("")
4143

42-
additionalActor.stateName should equal (Follower)
43-
4444
eventually {
4545
infoMemberStates()
46-
leaders() should have length 1
47-
candidates() should have length 0
48-
followers() should have length 5
46+
leaders should have length 1
47+
candidates should have length 0
48+
followers should have length 5
4949
}
5050

5151
info("After adding raft-member-6, and configuration change: ")
5252
infoMemberStates()
5353
}
5454

55+
override def beforeAll(): Unit =
56+
subscribeClusterStateTransitions()
57+
super.beforeAll()
58+
5559
}

src/test/scala/pl/project13/scala/akka/raft/ClusterRaftSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import com.typesafe.config.ConfigFactory
66
/**
77
* Base class for tests touching ClusterRaftActor, and clustering in general, but do not need to be multi-jvm yet.
88
*/
9-
abstract class ClusterRaftSpec(_system: ActorSystem) extends RaftSpec(false, Some(_system)) {
9+
abstract class ClusterRaftSpec(_system: ActorSystem) extends RaftSpec(Some(_system)) {
1010
def this() {
1111
this(ActorSystem.apply("rem-syst", ConfigFactory.parseResources("cluster.conf").withFallback(ConfigFactory.load())))
1212
}

0 commit comments

Comments
 (0)