Skip to content

Commit

Permalink
Prevent cycle sending
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangpengcheng committed May 27, 2022
1 parent 1a6c99d commit b5f7aaf
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ object LoggingMarkers {
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
def SCHEDULER_QUEUE = LogMarkerToken(scheduler, "queue", counter)(MeasurementUnit.none)
def SCHEDULER_QUEUE_CREATE = LogMarkerToken(scheduler, "queueCreate", start)(MeasurementUnit.time.milliseconds)
def SCHEDULER_QUEUE_RECOVER = LogMarkerToken(scheduler, "queueRecover", start)(MeasurementUnit.time.milliseconds)
def SCHEDULER_QUEUE_UPDATE(reason: String) =
LogMarkerToken(scheduler, "queueUpdate", counter, None, Map("reason" -> reason))(MeasurementUnit.none)
def SCHEDULER_QUEUE_WAITING_ACTIVATION(action: String) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ case class UpdateMemoryQueue(oldAction: DocInfo,
case class CreateNewQueue(activationMessage: ActivationMessage,
action: FullyQualifiedEntityName,
actionMetadata: WhiskActionMetaData)
case class RecoverQueue(activationMessage: ActivationMessage,
action: FullyQualifiedEntityName,
actionMetadata: WhiskActionMetaData)

case class QueueManagerConfig(maxRetriesToGetQueue: Int, maxSchedulingTime: FiniteDuration)

Expand All @@ -80,7 +83,7 @@ class QueueManager(

private val actorSelectionMap = TrieMap[String, ActorSelection]()

private val leaderElectionCallbacks = TrieMap[String, Either[EtcdFollower, EtcdLeader] => Unit]()
private val leaderElectionCallbacks = TrieMap[String, (Either[EtcdFollower, EtcdLeader], Boolean) => Unit]()

private implicit val askTimeout = Timeout(5.seconds)
private implicit val ec = context.dispatcher
Expand All @@ -90,6 +93,8 @@ class QueueManager(
// watch leaders and register them into actorSelectionMap
watcherService ! WatchEndpoint(QueueKeys.queuePrefix, "", isPrefix = true, watcherName, Set(PutEvent, DeleteEvent))

private var isShuttingDown = false

override def receive: Receive = {
case request: CreateQueue if isWarmUpAction(request.fqn) =>
logging.info(
Expand All @@ -114,12 +119,12 @@ class QueueManager(
msg.leadership match {
case Right(EtcdLeader(key, value, lease)) =>
leaderElectionCallbacks.remove(key).foreach { callback =>
callback(Right(EtcdLeader(key, value, lease)))
callback(Right(EtcdLeader(key, value, lease)), isShuttingDown)
}

case Left(EtcdFollower(key, value)) =>
leaderElectionCallbacks.remove(key).foreach { callback =>
callback(Left(EtcdFollower(key, value)))
callback(Left(EtcdFollower(key, value)), isShuttingDown)
}
}

Expand All @@ -129,7 +134,11 @@ class QueueManager(
s"Got activation message ${msg.activationId} for ${msg.user.namespace}/${msg.action} from remote queue manager.")(
msg.transid)

handleActivationMessage(msg)
if (sender() == self) {
handleCycle(msg)(msg.transid)
} else {
handleActivationMessage(msg)
}

case UpdateMemoryQueue(oldAction, newAction, msg) =>
logging.info(
Expand Down Expand Up @@ -164,6 +173,24 @@ class QueueManager(
updateInitRevisionMap(getLeaderKey(msg.user.namespace.name.asString, msg.action), msg.revision)
queue ! msg
msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_CREATE)
if (isShuttingDown) {
queue ! GracefulShutdown
}
}

case RecoverQueue(msg, action, actionMetaData) =>
QueuePool.keys.find(_.docInfo.id == action.toDocId) match {
// queue is already recovered or a newer queue is created, send msg to new queue
case Some(key) if key.docInfo.rev >= msg.revision =>
QueuePool.get(key) match {
case Some(queue) if queue.isLeader =>
queue.queue ! msg.copy(revision = key.docInfo.rev)
logging.info(this, s"Queue for action $action is already recovered, skip")(msg.transid)
case _ =>
recreateQueue(action, msg, actionMetaData)
}
case _ =>
recreateQueue(action, msg, actionMetaData)
}

// leaderKey is now optional, it becomes None when the stale queue is removed
Expand Down Expand Up @@ -208,6 +235,7 @@ class QueueManager(
}

case GracefulShutdown =>
isShuttingDown = true
logging.info(this, s"Gracefully shutdown the queue manager")

watcherService ! UnwatchEndpoint(QueueKeys.queuePrefix, isPrefix = true, watcherName)
Expand Down Expand Up @@ -317,6 +345,47 @@ class QueueManager(
}
}

private def recreateQueue(action: FullyQualifiedEntityName,
msg: ActivationMessage,
actionMetaData: WhiskActionMetaData): Unit = {
logging.warn(this, s"recreate queue for ${msg.action}")(msg.transid)
val queue = createAndStartQueue(msg.user.namespace.name.asString, action, msg.revision, actionMetaData)
queue ! msg
msg.transid.mark(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
if (isShuttingDown) {
queue ! GracefulShutdown
}
}

private def handleCycle(msg: ActivationMessage)(implicit transid: TransactionId): Future[Any] = {
logging.warn(this, s"queue for ${msg.action} doesn't exist in memory but exist in etcd, recovering...")
val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)

logging.info(this, s"Recover a queue for ${msg.action},")
getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, false)
.map { actionMetaData: WhiskActionMetaData =>
actionMetaData.toExecutableWhiskAction match {
case Some(_) =>
self ! RecoverQueue(msg, msg.action.copy(version = Some(actionMetaData.version)), actionMetaData)
transid.finished(this, start, s"recovering queue for ${msg.action.toDocId.asDocInfo(actionMetaData.rev)}")

case None =>
val message =
s"non-executable action: ${msg.action} with rev: ${msg.revision} reached queueManager"
completeErrorActivation(msg, message)
transid.failed(this, start, message)
}
}
.recover {
case t =>
transid.failed(
this,
start,
s"failed to fetch action ${msg.action} with rev: ${msg.revision}, error ${t.getMessage}")
completeErrorActivation(msg, t.getMessage)
}
}

private def handleActivationMessage(msg: ActivationMessage): Any = {
implicit val transid = msg.transid

Expand Down Expand Up @@ -451,24 +520,24 @@ class QueueManager(
case None =>
dataManagementService ! ElectLeader(leaderKey, schedulerEndpoints.serialize, self)
leaderElectionCallbacks.put(
leaderKey, {
case Right(EtcdLeader(_, _, _)) =>
val queue = childFactory(
context,
request.invocationNamespace,
request.fqn,
request.revision,
request.whiskActionMetaData)
queue ! Start
QueuePool.put(
MemoryQueueKey(request.invocationNamespace, request.fqn.toDocId.asDocInfo(request.revision)),
MemoryQueueValue(queue, true))
updateInitRevisionMap(leaderKey, request.revision)
receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))

// in case of follower, do nothing
case Left(EtcdFollower(_, _)) =>
receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
leaderKey,
(electResult, isShuttingDown) => {
electResult match {
case Right(EtcdLeader(_, _, _)) =>
val queue = createAndStartQueue(
request.invocationNamespace,
request.fqn,
request.revision,
request.whiskActionMetaData)
receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
if (isShuttingDown) {
queue ! GracefulShutdown
}

// in case of follower, do nothing
case Left(EtcdFollower(_, _)) =>
receiver.foreach(_ ! CreateQueueResponse(request.invocationNamespace, request.fqn, success = true))
}
})

// there is already a leader election for leaderKey, so skip it
Expand All @@ -488,6 +557,20 @@ class QueueManager(
}
}

private def createAndStartQueue(invocationNamespace: String,
action: FullyQualifiedEntityName,
revision: DocRevision,
actionMetaData: WhiskActionMetaData): ActorRef = {
val queue =
childFactory(context, invocationNamespace, action, revision, actionMetaData)
queue ! Start
QueuePool.put(
MemoryQueueKey(invocationNamespace, action.toDocId.asDocInfo(revision)),
MemoryQueueValue(queue, true))
updateInitRevisionMap(getLeaderKey(invocationNamespace, action), revision)
queue
}

private val logScheduler = context.system.scheduler.scheduleAtFixedRate(0.seconds, 1.seconds)(() => {
MetricEmitter.emitHistogramMetric(LoggingMarkers.SCHEDULER_QUEUE, QueuePool.countLeader())
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,65 @@ class QueueManagerTests
probe.expectMsg(activationMessage.copy(action = finalFqn, revision = finalRevision))
}

it should "recreate the queue if it's removed by mistake while leader key is not removed from etcd" in {
val mockEtcdClient = mock[EtcdClient]
(mockEtcdClient
.get(_: String))
.expects(*)
.returning(Future.successful {
RangeResponse
.newBuilder()
.addKvs(KeyValue.newBuilder().setKey("test").setValue(schedulerEndpoint.serialize).build())
.build()
})
.anyNumberOfTimes()
val dataManagementService = getTestDataManagementService()
val watcher = TestProbe()

val probe = TestProbe()

val childFactory =
(_: ActorRefFactory, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData) => probe.ref

val queueManager =
TestActorRef(
QueueManager
.props(
entityStore,
get,
mockEtcdClient,
schedulerEndpoint,
schedulerId,
dataManagementService.ref,
watcher.ref,
ack,
store,
childFactory,
mockConsumer))

watcher.expectMsg(watchEndpoint)
//current queue's revision is `1-test-revision`
(queueManager ? testQueueCreationMessage).mapTo[CreateQueueResponse].futureValue shouldBe CreateQueueResponse(
testInvocationNamespace,
testFQN,
true)

probe.expectMsg(Start)

// simulate queue superseded, the queue will be removed but leader key won't be deleted
queueManager ! QueueRemoved(
testInvocationNamespace,
testFQN.toDocId.asDocInfo(testDocRevision),
Some(testLeaderKey))

queueManager.!(activationMessage)(queueManager)
val msg2 = activationMessage.copy(activationId = ActivationId.generate())
queueManager.!(msg2)(queueManager) // even send two requests, we should only recreate one queue
probe.expectMsg(Start)
probe.expectMsg(activationMessage)
probe.expectMsg(msg2)
}

it should "not skip outdated activation when the revision is older than the one in a datastore" in {
stream.reset()
val mockEtcdClient = mock[EtcdClient]
Expand Down Expand Up @@ -1082,6 +1141,9 @@ class QueueManagerTests
val probe = TestProbe()
val fqn2 = FullyQualifiedEntityName(EntityPath("hello1"), EntityName("action1"))
val fqn3 = FullyQualifiedEntityName(EntityPath("hello2"), EntityName("action2"))
val fqn4 = FullyQualifiedEntityName(EntityPath("hello3"), EntityName("action3"))
val fqn5 = FullyQualifiedEntityName(EntityPath("hello4"), EntityName("action4"))
val fqn6 = FullyQualifiedEntityName(EntityPath("hello5"), EntityName("action5"))

// probe will watch all actors which are created by these factories
val childFactory =
Expand Down Expand Up @@ -1129,5 +1191,14 @@ class QueueManagerTests
queueManager ! GracefulShutdown

probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)

// after shutdown, it can still create/update/recover a queue, and new queue should be shutdown immediately too
(queueManager ? testQueueCreationMessage.copy(fqn = fqn4))
.mapTo[CreateQueueResponse]
.futureValue shouldBe CreateQueueResponse(testInvocationNamespace, fqn = fqn4, success = true)
queueManager ! CreateNewQueue(activationMessage, fqn5, testActionMetaData)
queueManager ! RecoverQueue(activationMessage, fqn6, testActionMetaData)

probe.expectMsgAllOf(10.seconds, GracefulShutdown, GracefulShutdown, GracefulShutdown)
}
}

0 comments on commit b5f7aaf

Please sign in to comment.