Skip to content

Commit

Permalink
Include workers in state data to thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
KeonHee committed Feb 28, 2021
1 parent 9527740 commit dd4e051
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ case object Active extends KeepAliveServiceState
sealed trait KeepAliveServiceData
case object NoData extends KeepAliveServiceData
case class Lease(id: Long, ttl: Long) extends KeepAliveServiceData
case class ActiveStates(worker: Cancellable, lease: Lease) extends KeepAliveServiceData

// Events received by the actor
case object RegrantLease
Expand All @@ -48,7 +49,7 @@ case object GrantLease

// Events internally used
case class SetLease(lease: Lease)
case object SetWatcher
case class SetWatcher(worker: Cancellable)

class LeaseKeepAliveService(etcdClient: EtcdClient, instanceId: InstanceId, watcherService: ActorRef)(
implicit logging: Logging,
Expand All @@ -59,15 +60,14 @@ class LeaseKeepAliveService(etcdClient: EtcdClient, instanceId: InstanceId, watc
implicit val ec: ExecutionContextExecutor = context.dispatcher

private val leaseTimeout = loadConfigOrThrow[Int](ConfigKeys.etcdLeaseTimeout).seconds
private var worker: Option[Cancellable] = None
private val key = instanceLease(instanceId)
private val watcherName = "lease-service"

self ! GrantLease
startWith(Ready, NoData)

when(Ready) {
case Event(GrantLease, _) =>
case Event(GrantLease, NoData) =>
etcdClient
.grant(leaseTimeout.toSeconds)
.map { res =>
Expand All @@ -76,14 +76,14 @@ class LeaseKeepAliveService(etcdClient: EtcdClient, instanceId: InstanceId, watc
.pipeTo(self)
stay

case Event(SetLease(lease), _) =>
case Event(SetLease(lease), NoData) =>
startKeepAliveService(lease)
.pipeTo(self)
logging.info(this, s"Granted a new lease $lease")
stay using lease

case Event(SetWatcher, lease: Lease) =>
goto(Active) using lease
case Event(SetWatcher(w), l: Lease) =>
goto(Active) using ActiveStates(w, l)

case Event(t: FailureMessage, _) =>
logging.warn(this, s"Failed to grant new lease caused by: $t")
Expand All @@ -94,15 +94,15 @@ class LeaseKeepAliveService(etcdClient: EtcdClient, instanceId: InstanceId, watc
}

when(Active) {
case Event(WatchEndpointRemoved(`key`, `key`, _, false), lease: Lease) =>
case Event(WatchEndpointRemoved(`key`, `key`, _, false), ActiveStates(worker, lease)) =>
logging.info(this, s"endpoint ie removed so recreate a lease")
recreateLease(lease)
recreateLease(worker, lease)

case Event(RegrantLease, lease: Lease) =>
case Event(RegrantLease, ActiveStates(worker, lease)) =>
logging.info(this, s"ReGrant a lease, old lease:${lease}")
recreateLease(lease)
recreateLease(worker, lease)

case Event(GetLease, lease: Lease) =>
case Event(GetLease, ActiveStates(_, lease)) =>
logging.info(this, s"send the lease(${lease}) to ${sender()}")
sender() ! lease
stay()
Expand All @@ -112,18 +112,17 @@ class LeaseKeepAliveService(etcdClient: EtcdClient, instanceId: InstanceId, watc

initialize()

private def startKeepAliveService(lease: Lease): Future[SetWatcher.type] = {
worker = Some {
private def startKeepAliveService(lease: Lease): Future[SetWatcher] = {
val worker =
actorSystem.scheduler.schedule(initialDelay = 0.second, interval = 500.milliseconds)(keepAliveOnce(lease))
}

/**
* To verify that lease has been deleted since timeout,
* create a key using lease, watch the key, and receive an event for deletion.
*/
etcdClient.put(key, s"${lease.id}", lease.id).map { _ =>
watcherService ! WatchEndpoint(key, s"${lease.id}", false, watcherName, Set(DeleteEvent))
SetWatcher
SetWatcher(worker)
}
}

Expand All @@ -139,14 +138,14 @@ class LeaseKeepAliveService(etcdClient: EtcdClient, instanceId: InstanceId, watc
}
}

private def recreateLease(lease: Lease) = {
private def recreateLease(worker: Cancellable, lease: Lease) = {
logging.info(this, s"recreate a lease, old lease: $lease")
worker.foreach(_.cancel()) // stop scheduler
worker.cancel() // stop scheduler
watcherService ! UnwatchEndpoint(key, false, watcherName) // stop watcher
etcdClient
.revoke(lease.id) // delete lease
.onComplete(_ => self ! GrantLease) // create lease
goto(Ready)
goto(Ready) using NoData
}

// Unstash all messages stashed while in intermediate state
Expand All @@ -162,7 +161,10 @@ class LeaseKeepAliveService(etcdClient: EtcdClient, instanceId: InstanceId, watc
}

override def postStop(): Unit = {
worker.foreach(_.cancel()) // stop scheduler if that exist
stateData match {
case ActiveStates(w, _) => w.cancel() // stop scheduler if that exist
case _ => // do nothing
}
watcherService ! UnwatchEndpoint(key, false, watcherName)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ class LeaseKeepAliveServiceTests

Thread.sleep(1000)
service.stateName shouldBe Active
service.stateData shouldBe testLease
service.stateData shouldBe a[ActiveStates]
service.stateData match {
case ActiveStates(_, lease) => lease shouldBe testLease
case _ => fail()
}
watcher.expectMsg(WatchEndpoint(testKey, testLease.id.toString, false, watcherName, Set(DeleteEvent)))

}
Expand Down Expand Up @@ -134,7 +138,11 @@ class LeaseKeepAliveServiceTests
val service = TestFSMRef(new LeaseKeepAliveService(mockEtcd, testInstanceId, watcher.ref))

service.stateName shouldBe Active
service.stateData shouldBe testLease
service.stateData shouldBe a[ActiveStates]
service.stateData match {
case ActiveStates(_, lease) => lease shouldBe testLease
case _ => fail()
}
watcher.expectMsg(WatchEndpoint(testKey, testLease.id.toString, false, watcherName, Set(DeleteEvent)))

service ! WatchEndpointRemoved(testKey, testKey, testLease.id.toString, false)
Expand All @@ -143,7 +151,11 @@ class LeaseKeepAliveServiceTests
Thread.sleep(500) //wait for the lease to be granted

service.stateName shouldBe Active
service.stateData shouldBe newTestLease
service.stateData shouldBe a[ActiveStates]
service.stateData match {
case ActiveStates(_, lease) => lease shouldBe newTestLease
case _ => fail()
}
watcher.expectMsg(WatchEndpoint(newTestKey, newTestLease.id.toString, false, watcherName, Set(DeleteEvent)))
}

Expand Down Expand Up @@ -198,14 +210,22 @@ class LeaseKeepAliveServiceTests
val watcher = TestProbe()
val service = TestFSMRef(new LeaseKeepAliveService(mockEtcd, testInstanceId, watcher.ref))
service.stateName shouldBe Active
service.stateData shouldBe testLease
service.stateData shouldBe a[ActiveStates]
service.stateData match {
case ActiveStates(_, lease) => lease shouldBe testLease
case _ => fail()
}
watcher.expectMsg(WatchEndpoint(testKey, testLease.id.toString, false, watcherName, Set(DeleteEvent)))

watcher.expectMsg(UnwatchEndpoint(testKey, false, watcherName))
Thread.sleep(1500) //wait for the lease to be granted

service.stateName shouldBe Active
service.stateData shouldBe newTestLease
service.stateData shouldBe a[ActiveStates]
service.stateData match {
case ActiveStates(_, lease) => lease shouldBe newTestLease
case _ => fail()
}
watcher.expectMsg(WatchEndpoint(newTestKey, newTestLease.id.toString, false, watcherName, Set(DeleteEvent)))
}

Expand Down

0 comments on commit dd4e051

Please sign in to comment.