From 60ca6605bb081f99906cff1a21caf75d47e414fa Mon Sep 17 00:00:00 2001 From: Brendan Doyle Date: Tue, 7 Mar 2023 10:17:34 -0800 Subject: [PATCH] Add Scheduler Queue Metric for Not Processing Any Activations (#5386) * Add Scheduler Queue Metric for Not Processing Any Activations * fix timeout comparison * account for action timeout being longer than queue retention --------- Co-authored-by: Brendan Doyle --- .../org/apache/openwhisk/common/Logging.scala | 7 +++++++ .../core/scheduler/queue/MemoryQueue.scala | 19 ++++++++++++++++--- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala index 2bc3b1c81d7..c2c4bbf5958 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala @@ -612,6 +612,13 @@ object LoggingMarkers { counter, Some(actionWithoutVersion), Map("namespace" -> namespace, "action" -> actionWithVersion))(MeasurementUnit.none) + def SCHEDULER_QUEUE_NOT_PROCESSING(namespace: String, actionWithVersion: String, actionWithoutVersion: String) = + LogMarkerToken( + scheduler, + "queueNotProcessing", + counter, + Some(actionWithoutVersion), + Map("namespace" -> namespace, "action" -> actionWithVersion))(MeasurementUnit.none) /* * General markers diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala index 8602702ddec..d21a6104d97 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala @@ -46,10 +46,10 @@ import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcu import pureconfig.loadConfigOrThrow import spray.json._ import pureconfig.generic.auto._ -import scala.collection.JavaConverters._ +import scala.collection.JavaConverters._ import java.time.{Duration, Instant} -import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import scala.annotation.tailrec import scala.collection.immutable.Queue import scala.collection.mutable @@ -139,6 +139,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, checkToDropStaleActivation: (Clock, Queue[TimeSeriesActivationEntry], Long, + AtomicLong, String, WhiskActionMetaData, MemoryQueueState, @@ -173,6 +174,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, private[queue] var queue = Queue.empty[TimeSeriesActivationEntry] private[queue] var in = new AtomicInteger(0) + private[queue] val lastActivationPulledTime = new AtomicLong(Instant.now.toEpochMilli) private[queue] val namespaceContainerCount = NamespaceContainerCount(invocationNamespace, etcdClient, watcherService) private[queue] var averageDuration: Option[Double] = None private[queue] var averageDurationBuffer = AverageRingBuffer(queueConfig.durationBufferSize) @@ -574,7 +576,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, case Event(DropOld, _) => if (queue.nonEmpty && Duration .between(queue.head.timestamp, clock.now()) - .compareTo(Duration.ofMillis(actionRetentionTimeout)) < 0) { + .compareTo(Duration.ofMillis(actionRetentionTimeout)) >= 0) { logging.error( this, s"[$invocationNamespace:$action:$stateName] Drop some stale activations for $revision, existing container is ${containers.size}, inProgress container is ${creationIds.size}, state data: $stateData, in is $in, current: ${queue.size}.") @@ -920,6 +922,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, clock, queue, actionRetentionTimeout, + lastActivationPulledTime, invocationNamespace, actionMetaData, stateName, @@ -1024,6 +1027,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, MetricEmitter.emitHistogramMetric( LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString, action.toStringWithoutVersion), totalTimeInScheduler.toMillis) + lastActivationPulledTime.set(Instant.now.toEpochMilli) res.trySuccess(Right(msg)) in.decrementAndGet() stay @@ -1049,6 +1053,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, MetricEmitter.emitHistogramMetric( LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString, action.toStringWithoutVersion), totalTimeInScheduler.toMillis) + lastActivationPulledTime.set(Instant.now.toEpochMilli) sender ! GetActivationResponse(Right(msg)) tryDisableActionThrottling() @@ -1186,6 +1191,7 @@ object MemoryQueue { def checkToDropStaleActivation(clock: Clock, queue: Queue[TimeSeriesActivationEntry], maxRetentionMs: Long, + lastActivationExecutedTime: AtomicLong, invocationNamespace: String, actionMetaData: WhiskActionMetaData, stateName: MemoryQueueState, @@ -1201,6 +1207,13 @@ object MemoryQueue { logging.info( this, s"[$invocationNamespace:$action:$stateName] some activations are stale msg: ${queue.head.msg.activationId}.") + val timeSinceLastActivationGrabbed = clock.now().toEpochMilli - lastActivationExecutedTime.get() + if (timeSinceLastActivationGrabbed > maxRetentionMs && timeSinceLastActivationGrabbed > actionMetaData.limits.timeout.millis) { + MetricEmitter.emitGaugeMetric( + LoggingMarkers + .SCHEDULER_QUEUE_NOT_PROCESSING(invocationNamespace, action.asString, action.toStringWithoutVersion), + 1) + } queueRef ! DropOld }