diff --git a/proxy/core/src/main/scala/io/cloudstate/proxy/telemetry/CloudstateTelemetry.scala b/proxy/core/src/main/scala/io/cloudstate/proxy/telemetry/CloudstateTelemetry.scala index 11d9fb2ab..de253c450 100644 --- a/proxy/core/src/main/scala/io/cloudstate/proxy/telemetry/CloudstateTelemetry.scala +++ b/proxy/core/src/main/scala/io/cloudstate/proxy/telemetry/CloudstateTelemetry.scala @@ -56,10 +56,18 @@ final class CloudstateTelemetry(system: ActorSystem) extends Extension { if (settings.enabled) new PrometheusEventSourcedInstrumentation(prometheusRegistry) else NoEventSourcedInstrumentation + val valueBasedInstrumentation: EntityInstrumentation = + if (settings.enabled) new PrometheusEntityInstrumentation(prometheusRegistry) + else NoEntityInstrumentation + def eventSourcedEntityInstrumentation(entityName: String): EventSourcedEntityInstrumentation = if (settings.enabled) new ActiveEventSourcedEntityInstrumentation(entityName, eventSourcedInstrumentation) else NoEventSourcedEntityInstrumentation + def valueBasedInstrumentation(entityName: String): ValueEntityInstrumentation = + if (settings.enabled) new ActiveValueEntityInstrumentation(entityName, valueBasedInstrumentation) + else NoValueEntityInstrumentation + def start(): Unit = if (settings.enabled) { new PrometheusExporter(prometheusRegistry, settings.prometheusHost, settings.prometheusPort)(system).start() diff --git a/proxy/core/src/main/scala/io/cloudstate/proxy/telemetry/EntityInstrumentation.scala b/proxy/core/src/main/scala/io/cloudstate/proxy/telemetry/EntityInstrumentation.scala new file mode 100644 index 000000000..211e8fa3d --- /dev/null +++ b/proxy/core/src/main/scala/io/cloudstate/proxy/telemetry/EntityInstrumentation.scala @@ -0,0 +1,367 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.proxy.telemetry + +object EntityInstrumentation { + final case class Context(startTime: Long) + final case class StashContext(stash: Context, restore: Context) +} + +/** + * Instrumentation SPI for value-based entities. + */ +abstract class EntityInstrumentation { + import EntityInstrumentation._ + + /** + * Record entity activated. + * + * @param entityName the entity name + * @return the context that will be passed to [[entityPassivated]] + */ + def entityActivated(entityName: String): Context + + /** + * Record entity passivated (actor stopped). + * + * @param entityName the entity name + * @param context the context passed from [[entityActivated]] + */ + def entityPassivated(entityName: String, context: Context): Unit + + /** + * Record entity failed (unexpected termination or crash). + * + * @param entityName the entity name + */ + def entityFailed(entityName: String): Unit + + /** + * Record entity recovery started (loading state). + * + * @param entityName the entity name + * @return the context that will be passed to [[recoveryCompleted]] + */ + def recoveryStarted(entityName: String): Context + + /** + * Record entity recovery completed (loading state). + * + * @param entityName the entity name + * @param context the context passed from [[recoveryStarted]] + */ + def recoveryCompleted(entityName: String, context: Context): Unit + + /** + * Record entity recovery failed (loading state). + * + * @param entityName the entity name + */ + def recoveryFailed(entityName: String): Unit + + /** + * Record command received. + * + * @param entityName the entity name + * @return the context that will be passed to [[commandCompleted]] + */ + def commandReceived(entityName: String): Context + + /** + * Record command stashed (currently processing). + * + * @param entityName the entity name + * @param context the context from [[commandReceived]] + * @return the context that will be passed to [[commandUnstashed]] + */ + def commandStashed(entityName: String, context: Context): StashContext + + /** + * Record command unstashed (ready to process). + * + * @param entityName the entity name + * @param context the context passed from [[commandStashed]] + */ + def commandUnstashed(entityName: String, context: StashContext): Unit + + /** + * Record command started processing (sending command to user function). + * + * @param entityName the entity name + * @return the context that will be passed to [[commandProcessed]] + */ + def commandStarted(entityName: String): Context + + /** + * Record command completed processing (reply received from user function). + * + * @param entityName the entity name + * @param context the context passed from [[commandStarted]] + */ + def commandProcessed(entityName: String, context: Context): Unit + + /** + * Record command failed. + * + * @param entityName the entity name + */ + def commandFailed(entityName: String): Unit + + /** + * Record command fully completed (including processing and persisting). + * + * @param entityName the entity name + * @param context the context passed from [[commandReceived]] + */ + def commandCompleted(entityName: String, context: Context): Unit + + /** + * Record entity persist state started. + * + * @param entityName the entity name + * @return the context that will be passed to [[persistCompleted]] + */ + def persistStarted(entityName: String): Context + + /** + * Record entity persist state completed. + * + * @param entityName the entity name + * @param context the context passed from [[persistStarted]] + */ + def persistCompleted(entityName: String, context: Context): Unit + + /** + * Record entity persist state failed. + * + * @param entityName the entity name + */ + def persistFailed(entityName: String): Unit + + /** + * Record state persisted to storage. + * + * @param entityName the entity name + * @param size the serialized size of the state + */ + def statePersisted(entityName: String, size: Int): Unit + + /** + * Record state loaded from storage. + * + * @param entityName the entity name + * @param size the serialized size of the state + */ + def stateLoaded(entityName: String, size: Int): Unit + + /** + * Record entity delete state started. + * + * @param entityName the entity name + * @return the context that will be passed to [[deleteCompleted]] + */ + def deleteStarted(entityName: String): Context + + /** + * Record entity delete state completed. + * + * @param entityName the entity name + * @param context the context passed from [[deleteStarted]] + */ + def deleteCompleted(entityName: String, context: Context): Unit + + /** + * Record entity delete state failed. + * + * @param entityName the entity name + */ + def deleteFailed(entityName: String): Unit +} + +object NoEntityInstrumentation extends EntityInstrumentation { + import EntityInstrumentation._ + + override def entityActivated(entityName: String): Context = null + override def entityPassivated(entityName: String, context: Context): Unit = () + override def entityFailed(entityName: String): Unit = () + override def recoveryStarted(entityName: String): Context = null + override def recoveryCompleted(entityName: String, context: Context): Unit = () + override def recoveryFailed(entityName: String): Unit = () + override def commandReceived(entityName: String): Context = null + override def commandStashed(entityName: String, context: Context): StashContext = null + override def commandUnstashed(entityName: String, context: StashContext): Unit = () + override def commandStarted(entityName: String): Context = null + override def commandProcessed(entityName: String, context: Context): Unit = () + override def commandFailed(entityName: String): Unit = () + override def commandCompleted(entityName: String, context: Context): Unit = () + override def persistStarted(entityName: String): Context = null + override def persistCompleted(entityName: String, context: Context): Unit = () + override def persistFailed(entityName: String): Unit = ??? + override def statePersisted(entityName: String, size: Int): Unit = () + override def stateLoaded(entityName: String, size: Int): Unit = () + override def deleteStarted(entityName: String): Context = null + override def deleteCompleted(entityName: String, context: Context): Unit = () + override def deleteFailed(entityName: String): Unit = () +} + +/** + * Instrumentation wrapper for an entity instance. + */ +abstract class ValueEntityInstrumentation { + import EntityInstrumentation._ + + def entityActivated(): Unit + def entityPassivated(): Unit + def entityFailed(): Unit + def recoveryStarted(): Unit + def recoveryCompleted(): Unit + def recoveryFailed(): Unit + def commandReceived(): Unit + def commandStashed(): StashContext + def commandUnstashed(context: StashContext): Unit + def commandStarted(): Unit + def commandProcessed(): Unit + def commandFailed(): Unit + def commandCompleted(): Unit + def persistStarted(): Unit + def persistCompleted(): Unit + def persistFailed(): Unit + def statePersisted(size: Int): Unit + def stateLoaded(size: Int): Unit + def deleteStarted(): Unit + def deleteCompleted(): Unit + def deleteFailed(): Unit +} + +/** + * Active instrumentation wrapper for an entity instance. + * Stores instrumentation contexts local to an entity. + */ +class ActiveValueEntityInstrumentation(entityName: String, valueBased: EntityInstrumentation) + extends ValueEntityInstrumentation { + import EntityInstrumentation._ + + // only one of these contexts is active at a time (stashed commands are stored in the stash queue) + private[this] var entityContext: Context = _ + private[this] var recoveryContext: Context = _ + private[this] var receiveContext: Context = _ + private[this] var startContext: Context = _ + private[this] var persistContext: Context = _ + private[this] var deleteContext: Context = _ + + override def entityActivated(): Unit = + entityContext = valueBased.entityActivated(entityName) + + override def entityPassivated(): Unit = { + valueBased.entityPassivated(entityName, entityContext) + entityContext = null + } + + override def entityFailed(): Unit = + valueBased.entityFailed(entityName) + + override def recoveryStarted(): Unit = + recoveryContext = valueBased.recoveryStarted(entityName) + + override def recoveryCompleted(): Unit = { + valueBased.recoveryCompleted(entityName, recoveryContext) + recoveryContext = null + } + + override def recoveryFailed(): Unit = + valueBased.recoveryFailed(entityName) + + override def commandReceived(): Unit = + receiveContext = valueBased.commandReceived(entityName) + + override def commandStashed(): StashContext = + valueBased.commandStashed(entityName, valueBased.commandReceived(entityName)) + + override def commandUnstashed(context: StashContext): Unit = { + valueBased.commandUnstashed(entityName, context) + receiveContext = context.restore + } + + override def commandStarted(): Unit = + startContext = valueBased.commandStarted(entityName) + + override def commandProcessed(): Unit = { + valueBased.commandProcessed(entityName, startContext) + startContext = null + } + + override def commandFailed(): Unit = + valueBased.commandFailed(entityName) + + override def commandCompleted(): Unit = { + valueBased.commandCompleted(entityName, receiveContext) + receiveContext = null + } + + override def persistStarted(): Unit = + persistContext = valueBased.persistStarted(entityName) + + override def persistCompleted(): Unit = { + valueBased.persistCompleted(entityName, persistContext) + persistContext = null + } + + override def persistFailed(): Unit = + valueBased.persistFailed(entityName) + + override def statePersisted(size: Int): Unit = + valueBased.statePersisted(entityName, size) + + override def stateLoaded(size: Int): Unit = + valueBased.stateLoaded(entityName, size) + + override def deleteStarted(): Unit = + deleteContext = valueBased.deleteStarted(entityName) + + override def deleteCompleted(): Unit = { + valueBased.deleteCompleted(entityName, deleteContext) + deleteContext = null + } + + override def deleteFailed(): Unit = valueBased.deleteFailed(entityName) +} + +object NoValueEntityInstrumentation extends ValueEntityInstrumentation { + import EntityInstrumentation._ + + override def entityActivated(): Unit = () + override def entityPassivated(): Unit = () + override def entityFailed(): Unit = () + override def recoveryStarted(): Unit = () + override def recoveryCompleted(): Unit = () + override def recoveryFailed(): Unit = () + override def commandReceived(): Unit = () + override def commandStashed(): StashContext = null + override def commandUnstashed(context: StashContext): Unit = () + override def commandStarted(): Unit = () + override def commandProcessed(): Unit = () + override def commandFailed(): Unit = () + override def commandCompleted(): Unit = () + override def persistStarted(): Unit = () + override def persistCompleted(): Unit = () + override def persistFailed(): Unit = () + override def statePersisted(size: Int): Unit = () + override def stateLoaded(size: Int): Unit = () + override def deleteStarted(): Unit = () + override def deleteCompleted(): Unit = () + override def deleteFailed(): Unit = () +} diff --git a/proxy/core/src/main/scala/io/cloudstate/proxy/telemetry/PrometheusEntityInstrumentation.scala b/proxy/core/src/main/scala/io/cloudstate/proxy/telemetry/PrometheusEntityInstrumentation.scala new file mode 100644 index 000000000..8c905f387 --- /dev/null +++ b/proxy/core/src/main/scala/io/cloudstate/proxy/telemetry/PrometheusEntityInstrumentation.scala @@ -0,0 +1,296 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.proxy.telemetry + +import io.prometheus.client.{CollectorRegistry, Counter, Histogram, Summary} + +object PrometheusEntityInstrumentation { + object MetricName { + final val ActivatedEntitiesTotal = "cloudstate_valuebased_activated_entities_total" + final val PassivatedEntitiesTotal = "cloudstate_valuebased_passivated_entities_total" + final val EntityActiveTimeSeconds = "cloudstate_valuebased_entity_active_time_seconds" + final val FailedEntitiesTotal = "cloudstate_valuebased_failed_entities_total" + final val RecoveryTimeSeconds = "cloudstate_valuebased_recovery_time_seconds" + final val RecoveryFailedTotal = "cloudstate_valuebased_recovery_failed_total" + final val ReceivedCommandsTotal = "cloudstate_valuebased_received_commands_total" + final val StashedCommandsTotal = "cloudstate_valuebased_stashed_commands_total" + final val UnstashedCommandsTotal = "cloudstate_valuebased_unstashed_commands_total" + final val CommandStashTimeSeconds = "cloudstate_valuebased_command_stash_time_seconds" + final val CommandProcessingTimeSeconds = "cloudstate_valuebased_command_processing_time_seconds" + final val FailedCommandsTotal = "cloudstate_valuebased_failed_commands_total" + final val CompletedCommandsTotal = "cloudstate_valuebased_completed_commands_total" + final val CommandTotalTimeSeconds = "cloudstate_valuebased_command_total_time_seconds" + final val PersistTimeSeconds = "cloudstate_valuebased_persist_time_seconds" + final val PersistFailedTotal = "cloudstate_valuebased_persist_failed_total" + final val PersistedStatesTotal = "cloudstate_valuebased_persisted_states_total" + final val PersistedStateBytesTotal = "cloudstate_valuebased_persisted_state_bytes_total" + final val LoadedStatesTotal = "cloudstate_valuebased_loaded_state_total" + final val LoadedStateBytesTotal = "cloudstate_valuebased_loaded_state_bytes_total" + final val DeleteTimeSeconds = "cloudstate_valuebased_delete_time_seconds" + final val DeleteFailedTotal = "cloudstate_valuebased_delete_failed_total" + } + + object MetricLabel { + final val EntityName = "entity_name" + } + + final val NanosecondsPerSecond = 1e9 +} + +class PrometheusEntityInstrumentation(registry: CollectorRegistry) extends EntityInstrumentation { + import EntityInstrumentation._ + import PrometheusEntityInstrumentation._ + + // Notes: + // Using Histograms for command timing metrics, with the default buckets (from 5ms up to 10s). + // Timing metrics are in seconds, using the Prometheus convention of base units. + // Not using Summary for command timing metrics, as it currently has terrible performance. + // HDRHistogram-backed Summary still coming: https://github.com/prometheus/client_java/pull/484 + + private val activatedEntitiesTotal: Counter = Counter.build + .name(MetricName.ActivatedEntitiesTotal) + .help("Total entity instances that have activated.") + .labelNames(MetricLabel.EntityName) + .register(registry) + + private val passivatedEntitiesTotal: Counter = Counter.build + .name(MetricName.PassivatedEntitiesTotal) + .help("Total entity instances that have passivated.") + .labelNames(MetricLabel.EntityName) + .register(registry) + + private val entityActiveTimeSeconds: Summary = Summary.build + .name(MetricName.EntityActiveTimeSeconds) + .help("Duration that entities are active, in seconds.") + .labelNames(MetricLabel.EntityName) + .quantile(0.50, 0.050) // 50th percentile (median), 5% error + .quantile(0.95, 0.010) // 95th percentile, 1% error + .quantile(0.99, 0.001) // 99th percentile, 0.1% error + .quantile(1.00, 0.000) // 100th percentile (max), 0% error + .maxAgeSeconds(60) + .ageBuckets(6) + .register(registry) + + private val failedEntitiesTotal: Counter = Counter.build + .name(MetricName.FailedEntitiesTotal) + .help("Total entity instances that have failed.") + .labelNames(MetricLabel.EntityName) + .register(registry) + + private val recoveryTimeSeconds: Histogram = Histogram.build + .name(MetricName.RecoveryTimeSeconds) + .help("Duration for entity recovery, in seconds.") + .labelNames(MetricLabel.EntityName) + .register(registry) + + private val recoveryFailedTotal: Counter = Counter.build + .name(MetricName.RecoveryFailedTotal) + .help("Total recovery process that have failed.") + .labelNames(MetricLabel.EntityName) + .register(registry) + + private val receivedCommandsTotal: Counter = Counter.build + .name(MetricName.ReceivedCommandsTotal) + .help("Total commands received for an entity.") + .labelNames(MetricLabel.EntityName) + .register(registry) + + private val stashedCommandsTotal: Counter = Counter.build + .name(MetricName.StashedCommandsTotal) + .help("Total commands stashed for an entity.") + .labelNames(MetricLabel.EntityName) + .register(registry) + + private val unstashedCommandsTotal: Counter = Counter.build + .name(MetricName.UnstashedCommandsTotal) + .help("Total commands unstashed for an entity.") + .labelNames(MetricLabel.EntityName) + .register(registry) + + private val commandStashTimeSeconds: Histogram = Histogram.build + .name(MetricName.CommandStashTimeSeconds) + .help("Duration that commands are stashed, in seconds.") + .labelNames(MetricLabel.EntityName) + .register(registry) + + private val commandProcessingTimeSeconds: Histogram = Histogram.build + .name(MetricName.CommandProcessingTimeSeconds) + .help("Duration for command processing (until user function reply), in seconds.") + .labelNames(MetricLabel.EntityName) + .register(registry) + + private val failedCommandsTotal: Counter = Counter.build + .name(MetricName.FailedCommandsTotal) + .help("Total commands that have failed.") + .labelNames(MetricLabel.EntityName) + .register(registry) + + private val completedCommandsTotal: Counter = Counter.build + .name(MetricName.CompletedCommandsTotal) + .help("Total commands completed for an entity.") + .labelNames(MetricLabel.EntityName) + .register(registry) + + private val commandTotalTimeSeconds: Histogram = Histogram.build + .name(MetricName.CommandTotalTimeSeconds) + .help("Total duration for command handling (including stash and persist), in seconds.") + .labelNames(MetricLabel.EntityName) + .register(registry) + + private val persistTimeSeconds: Histogram = Histogram.build + .name(MetricName.PersistTimeSeconds) + .help("Duration for persisting state from a command, in seconds.") + .labelNames(MetricLabel.EntityName) + .register(registry) + + private val persistFailedTotal: Counter = Counter.build + .name(MetricName.PersistFailedTotal) + .help("Total persist operations that have failed.") + .labelNames(MetricLabel.EntityName) + .register(registry) + + private val persistedStatesTotal: Counter = Counter.build + .name(MetricName.PersistedStatesTotal) + .help("Total states persisted for an entity.") + .labelNames(MetricLabel.EntityName) + .register(registry) + + private val persistedStateBytesTotal: Counter = Counter.build + .name(MetricName.PersistedStateBytesTotal) + .help("Total state sizes persisted for an entity.") + .labelNames(MetricLabel.EntityName) + .register(registry) + + private val loadedStatesTotal: Counter = Counter.build + .name(MetricName.LoadedStatesTotal) + .help("Total states loaded for an entity.") + .labelNames(MetricLabel.EntityName) + .register(registry) + + private val loadedStateBytesTotal: Counter = Counter.build + .name(MetricName.LoadedStateBytesTotal) + .help("Total states sizes loaded for an entity.") + .labelNames(MetricLabel.EntityName) + .register(registry) + + private val deleteTimeSeconds: Histogram = Histogram.build + .name(MetricName.DeleteTimeSeconds) + .help("Duration for deleting state from a command, in seconds.") + .labelNames(MetricLabel.EntityName) + .register(registry) + + private val deleteFailedTotal: Counter = Counter.build + .name(MetricName.DeleteFailedTotal) + .help("Total delete operations that have failed.") + .labelNames(MetricLabel.EntityName) + .register(registry) + + private def now: Long = System.nanoTime() + + private def elapsedSeconds(context: Context): Double = + (now - context.startTime) / NanosecondsPerSecond + + private def startSpan(): Context = Context(startTime = now) + + private def startSpan(entityName: String, counter: Counter): Context = { + counter.labels(entityName).inc() + startSpan() + } + + private def endSpan(entityName: String, timer: Histogram, context: Context): Unit = + timer.labels(entityName).observe(elapsedSeconds(context)) + + private def endSpan(entityName: String, counter: Counter, timer: Histogram, context: Context): Unit = { + counter.labels(entityName).inc() + endSpan(entityName, timer, context) + } + + private def endSpan(entityName: String, timer: Summary, context: Context): Unit = + timer.labels(entityName).observe(elapsedSeconds(context)) + + private def endSpan(entityName: String, counter: Counter, timer: Summary, context: Context): Unit = { + counter.labels(entityName).inc() + endSpan(entityName, timer, context) + } + + override def entityActivated(entityName: String): Context = + startSpan(entityName, activatedEntitiesTotal) + + override def entityPassivated(entityName: String, context: Context): Unit = + endSpan(entityName, passivatedEntitiesTotal, entityActiveTimeSeconds, context) + + override def entityFailed(entityName: String): Unit = + failedEntitiesTotal.labels(entityName).inc() + + override def recoveryStarted(entityName: String): Context = + startSpan() + + override def recoveryCompleted(entityName: String, context: Context): Unit = + endSpan(entityName, recoveryTimeSeconds, context) + + override def recoveryFailed(entityName: String): Unit = + recoveryFailedTotal.labels(entityName).inc() + + override def commandReceived(entityName: String): Context = + startSpan(entityName, receivedCommandsTotal) + + override def commandStashed(entityName: String, context: Context): StashContext = + StashContext(stash = startSpan(entityName, stashedCommandsTotal), restore = context) + + override def commandUnstashed(entityName: String, context: StashContext): Unit = + endSpan(entityName, unstashedCommandsTotal, commandStashTimeSeconds, context.stash) + + override def commandStarted(entityName: String): Context = + startSpan() + + override def commandProcessed(entityName: String, context: Context): Unit = + endSpan(entityName, commandProcessingTimeSeconds, context) + + override def commandFailed(entityName: String): Unit = + failedCommandsTotal.labels(entityName).inc() + + override def commandCompleted(entityName: String, context: Context): Unit = + endSpan(entityName, completedCommandsTotal, commandTotalTimeSeconds, context) + + override def persistStarted(entityName: String): Context = + startSpan() + + override def persistCompleted(entityName: String, context: Context): Unit = + endSpan(entityName, persistTimeSeconds, context) + + override def persistFailed(entityName: String): Unit = + persistFailedTotal.labels(entityName).inc() + + override def statePersisted(entityName: String, size: Int): Unit = { + persistedStatesTotal.labels(entityName).inc() + persistedStateBytesTotal.labels(entityName).inc(size) + } + + override def stateLoaded(entityName: String, size: Int): Unit = { + loadedStatesTotal.labels(entityName).inc() + loadedStateBytesTotal.labels(entityName).inc(size) + } + + override def deleteStarted(entityName: String): Context = + startSpan() + + override def deleteCompleted(entityName: String, context: Context): Unit = + endSpan(entityName, deleteTimeSeconds, context) + + override def deleteFailed(entityName: String): Unit = + deleteFailedTotal.labels(entityName).inc() +} diff --git a/proxy/core/src/main/scala/io/cloudstate/proxy/valueentity/Entity.scala b/proxy/core/src/main/scala/io/cloudstate/proxy/valueentity/Entity.scala index 333f05356..0b4149567 100644 --- a/proxy/core/src/main/scala/io/cloudstate/proxy/valueentity/Entity.scala +++ b/proxy/core/src/main/scala/io/cloudstate/proxy/valueentity/Entity.scala @@ -29,6 +29,7 @@ import akka.util.Timeout import io.cloudstate.protocol.entity._ import io.cloudstate.protocol.value_entity._ import io.cloudstate.proxy.entity.{EntityCommand, UserFunctionReply} +import io.cloudstate.proxy.telemetry.CloudstateTelemetry import io.cloudstate.proxy.valueentity.store.Repository import io.cloudstate.proxy.valueentity.store.Store.Key @@ -212,19 +213,27 @@ final class ValueEntity(configuration: ValueEntity.Configuration, with Stash with ActorLogging { + import io.cloudstate.proxy.telemetry.EntityInstrumentation.StashContext + private implicit val ec = context.dispatcher private val persistenceId: String = configuration.userFunctionName + entityId private val actorId = ValueEntity.actorCounter.incrementAndGet() - private[this] final var stashedCommands = Queue.empty[(EntityCommand, ActorRef)] // PERFORMANCE: look at options for data structures + private[this] final var stashedCommands = Queue.empty[(EntityCommand, ActorRef, StashContext)] // PERFORMANCE: look at options for data structures private[this] final var currentCommand: ValueEntity.OutstandingCommand = null private[this] final var stopped = false private[this] final var idCounter = 0L private[this] final var inited = false private[this] final var commandStartTime = 0L + private[this] val instrumentation = + CloudstateTelemetry(context.system).valueBasedInstrumentation(configuration.userFunctionName) + + instrumentation.entityActivated() + instrumentation.recoveryStarted() + // Set up passivation timer context.setReceiveTimeout(configuration.passivationTimeout.duration) @@ -232,6 +241,9 @@ final class ValueEntity(configuration: ValueEntity.Configuration, repository .get(Key(persistenceId, entityId)) .map { state => + instrumentation.recoveryCompleted() + state.map(s => instrumentation.stateLoaded(s.serializedSize)) + if (!inited) { relay ! ValueEntityStreamIn( ValueEntityStreamIn.Message.Init( @@ -248,20 +260,27 @@ final class ValueEntity(configuration: ValueEntity.Configuration, } } .recover { - case error => ValueEntity.ReadStateFailure(error) + case error => + instrumentation.recoveryFailed() + instrumentation.recoveryCompleted() + ValueEntity.ReadStateFailure(error) } .pipeTo(self) - override final def postStop(): Unit = + override final def postStop(): Unit = { if (currentCommand != null) { log.warning("Stopped but we have a current action id {}", currentCommand.actionId) } + instrumentation.entityPassivated() + } private[this] final def commandHandled(): Unit = { currentCommand = null + instrumentation.commandCompleted() if (stashedCommands.nonEmpty) { - val ((request, sender), newStashedCommands) = stashedCommands.dequeue + val ((request, sender, stashContext), newStashedCommands) = stashedCommands.dequeue stashedCommands = newStashedCommands + instrumentation.commandUnstashed(stashContext) handleCommand(request, sender) } else if (stopped) { context.stop(self) @@ -269,13 +288,14 @@ final class ValueEntity(configuration: ValueEntity.Configuration, } private[this] final def notifyOutstandingRequests(msg: String): Unit = { + instrumentation.entityFailed() currentCommand match { case null => case req => req.replyTo ! createFailure(msg) } val errorNotification = createFailure("Value entity terminated") stashedCommands.foreach { - case (_, replyTo) => replyTo ! errorNotification + case (_, replyTo, _) => replyTo ! errorNotification } } @@ -286,6 +306,7 @@ final class ValueEntity(configuration: ValueEntity.Configuration, } private[this] final def handleCommand(entityCommand: EntityCommand, sender: ActorRef): Unit = { + instrumentation.commandStarted() idCounter += 1 val command = Command( entityId = entityId, @@ -326,9 +347,11 @@ final class ValueEntity(configuration: ValueEntity.Configuration, private def running: Receive = { case command: EntityCommand if currentCommand != null => - stashedCommands = stashedCommands.enqueue((command, sender())) + stashedCommands = stashedCommands.enqueue((command, sender(), instrumentation.commandStashed())) case command: EntityCommand => + log.debug("Value entity [{}] [{}] received command [{}]", configuration.serviceName, entityId, command.name) + instrumentation.commandReceived() handleCommand(command, sender()) case ValueEntityStreamOut(m, _) => @@ -343,6 +366,13 @@ final class ValueEntity(configuration: ValueEntity.Configuration, s"(expected id ${currentCommand.commandId} but got ${r.commandId}) - $r") case ValueEntitySOMsg.Reply(r) => + instrumentation.commandProcessed() + r.clientAction match { + case Some(ClientAction(ClientAction.Action.Failure(_), _)) => + instrumentation.commandFailed() + case _ => + } + val commandId = currentCommand.commandId if (r.stateAction.isEmpty) { currentCommand.replyTo ! valueEntityReplyToUfReply(r) @@ -371,6 +401,9 @@ final class ValueEntity(configuration: ValueEntity.Configuration, s"(expected id ${currentCommand.commandId} but got ${f.commandId}) - ${f.description}") case ValueEntitySOMsg.Failure(f) => + instrumentation.commandFailed() + instrumentation.commandProcessed() + instrumentation.commandCompleted() try crash("Unexpected Value entity failure", f.description) finally currentCommand = null // clear command after notifications @@ -412,25 +445,36 @@ final class ValueEntity(configuration: ValueEntity.Configuration, action.action match { case Update(ValueEntityUpdate(Some(value), _)) => + instrumentation.persistStarted() + instrumentation.statePersisted(value.serializedSize) repository .update(Key(persistenceId, entityId), value) .map { _ => + instrumentation.persistCompleted() handler() ValueEntity.WriteStateSuccess } .recover { - case error => ValueEntity.WriteStateFailure(error) + case error => + instrumentation.persistFailed() + instrumentation.persistCompleted() + ValueEntity.WriteStateFailure(error) } case Delete(_) => + instrumentation.deleteStarted() repository .delete(Key(persistenceId, entityId)) .map { _ => + instrumentation.deleteCompleted() handler() ValueEntity.WriteStateSuccess } .recover { - case error => ValueEntity.WriteStateFailure(error) + case error => + instrumentation.deleteFailed() + instrumentation.deleteCompleted() + ValueEntity.WriteStateFailure(error) } } } diff --git a/proxy/core/src/test/scala/io/cloudstate/proxy/telemetry/CloudstateTelemetrySpec.scala b/proxy/core/src/test/scala/io/cloudstate/proxy/telemetry/CloudstateTelemetrySpec.scala index 845274e70..a50e17721 100644 --- a/proxy/core/src/test/scala/io/cloudstate/proxy/telemetry/CloudstateTelemetrySpec.scala +++ b/proxy/core/src/test/scala/io/cloudstate/proxy/telemetry/CloudstateTelemetrySpec.scala @@ -69,7 +69,7 @@ class CloudstateTelemetrySpec extends AbstractTelemetrySpec { CloudstateTelemetry(system).start() } val metrics = scrape("http://localhost:9090")(_.mkString) - metrics should include("# TYPE cloudstate_eventsourced") + metrics should (include("# TYPE cloudstate_eventsourced") and include("# TYPE cloudstate_valuebased")) } "allow Prometheus metrics port to be configured" in withTestRegistry( @@ -83,7 +83,7 @@ class CloudstateTelemetrySpec extends AbstractTelemetrySpec { CloudstateTelemetry(system).start() } val metrics = scrape("http://localhost:9999")(_.mkString) - metrics should include("# TYPE cloudstate_eventsourced") + metrics should (include("# TYPE cloudstate_eventsourced") and include("# TYPE cloudstate_valuebased")) } "bind event-sourced instrumentation to Prometheus by default" in withTestRegistry() { testKit => @@ -109,5 +109,29 @@ class CloudstateTelemetrySpec extends AbstractTelemetrySpec { CloudstateTelemetry(testKit.system) .eventSourcedEntityInstrumentation("name") should be theSameInstanceAs NoEventSourcedEntityInstrumentation } + + "bind value-based instrumentation to Prometheus by default" in withTestRegistry() { testKit => + CloudstateTelemetry(testKit.system).valueBasedInstrumentation shouldBe a[PrometheusEntityInstrumentation] + } + + "use noop value-based instrumentation when disabled" in withTestKit(""" + | cloudstate.proxy.telemetry.disabled = true + """) { testKit => + CloudstateTelemetry(testKit.system).valueBasedInstrumentation should be theSameInstanceAs NoEntityInstrumentation + } + + "bind active value-based entity instrumentation by default" in withTestRegistry() { testKit => + CloudstateTelemetry(testKit.system) + .valueBasedInstrumentation("name") shouldBe an[ActiveValueEntityInstrumentation] + } + + "use noop value-based entity instrumentation when disabled" in withTestKit( + """ + | cloudstate.proxy.telemetry.disabled = true + """ + ) { testKit => + CloudstateTelemetry(testKit.system) + .valueBasedInstrumentation("name") should be theSameInstanceAs NoValueEntityInstrumentation + } } } diff --git a/proxy/core/src/test/scala/io/cloudstate/proxy/telemetry/EntityInstrumentationSpec.scala b/proxy/core/src/test/scala/io/cloudstate/proxy/telemetry/EntityInstrumentationSpec.scala new file mode 100644 index 000000000..7602f5c89 --- /dev/null +++ b/proxy/core/src/test/scala/io/cloudstate/proxy/telemetry/EntityInstrumentationSpec.scala @@ -0,0 +1,211 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.proxy.telemetry + +import akka.actor.ActorRef +import akka.grpc.GrpcClientSettings +import akka.testkit.EventFilter +import akka.testkit.TestEvent.Mute +import com.google.protobuf.ByteString +import com.google.protobuf.any.{Any => ProtoAny} +import io.cloudstate.protocol.entity.{ClientAction, Failure} +import io.cloudstate.protocol.value_entity.ValueEntityClient +import io.cloudstate.proxy.entity.{EntityCommand, UserFunctionReply} +import io.cloudstate.proxy.valueentity.store.{InMemoryStore, RepositoryImpl} +import io.cloudstate.proxy.valueentity.{ValueEntity, ValueEntitySupervisor} +import io.cloudstate.testkit.TestService +import io.cloudstate.testkit.valueentity.ValueEntityMessages +import io.prometheus.client.CollectorRegistry + +import scala.concurrent.duration._ + +class EntityInstrumentationSpec extends AbstractTelemetrySpec { + + "EntityInstrumentation" should { + + "record value-based entity metrics" in withTestRegistry( + """ + | include "test-in-memory" + | akka { + | loglevel = DEBUG + | loggers = ["akka.testkit.TestEventListener"] + | remote.artery.canonical.port = 0 + | remote.artery.bind.port = "" + | } + """ + ) { testKit => + import PrometheusEntityInstrumentation.MetricLabel._ + import PrometheusEntityInstrumentation.MetricName._ + import ValueEntityMessages._ + import testKit._ + import testKit.system.dispatcher + + // silence any dead letters or unhandled messages during shutdown (when using test event listener) + system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter.*"))) + system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*unhandled message.*"))) + + // simulate user function interaction with value-based entity to validate instrumentation + implicit val registry: CollectorRegistry = CloudstateTelemetry(system).prometheusRegistry + + implicit val replyTo: ActorRef = testActor + + val service = TestService() + val client = + ValueEntityClient(GrpcClientSettings.connectToServiceAt("localhost", service.port).withTls(false)) + val entityConfiguration = ValueEntity.Configuration( + serviceName = "service", + userFunctionName = "test", + passivationTimeout = 30.seconds, + sendQueueSize = 100 + ) + val repository = new RepositoryImpl(new InMemoryStore(system)) + + val entity = system.actorOf(ValueEntitySupervisor.props(client, entityConfiguration, repository), "entity") + watch(entity) + + val emptyCommand = Some(protobufAny(EmptyJavaMessage)) + val entityState = ProtoAny("state", ByteString.copyFromUtf8("state")) + + // init with empty state + val connection = service.valueEntity.expectConnection() + connection.expect(init("service", "entity")) + + metricValue(ActivatedEntitiesTotal, EntityName -> "test") shouldBe 1 + metricValue(LoadedStatesTotal, EntityName -> "test") shouldBe 0.0 // empty state + metricValue(LoadedStateBytesTotal, EntityName -> "test") shouldBe 0.0 // empty state + metricValue(RecoveryTimeSeconds + "_count", EntityName -> "test") shouldBe 1 + metricValue(RecoveryTimeSeconds + "_sum", EntityName -> "test") should be > 0.0 + + // update command is executed + entity ! EntityCommand(entityId = "test", name = "updateCommand", emptyCommand) + connection.expect(command(1, "entity", "updateCommand")) + metricValue(ReceivedCommandsTotal, EntityName -> "test") shouldBe 1 + + // get command is stashed + entity ! EntityCommand(entityId = "test", name = "getCommand", emptyCommand) + + eventually(timeout(5.seconds), interval(100.millis)) { + metricValue(ReceivedCommandsTotal, EntityName -> "test") shouldBe 2 + metricValue(StashedCommandsTotal, EntityName -> "test") shouldBe 1 + } + + // send reply for update command + connection.send(reply(1, EmptyJavaMessage, update(entityState))) + expectMsg(UserFunctionReply(clientActionReply(messagePayload(EmptyJavaMessage)))) + + metricValue(CommandProcessingTimeSeconds + "_count", EntityName -> "test") shouldBe 1 + metricValue(CommandProcessingTimeSeconds + "_sum", EntityName -> "test") should be > 0.0 + metricValue(CommandTotalTimeSeconds + "_count", EntityName -> "test") shouldBe 1 + metricValue(CommandTotalTimeSeconds + "_sum", EntityName -> "test") should be > 0.0 + metricValue(PersistedStatesTotal, EntityName -> "test") should be > 0.0 + metricValue(PersistedStateBytesTotal, EntityName -> "test") should be > 0.0 + metricValue(PersistTimeSeconds + "_count", EntityName -> "test") shouldBe 1 + metricValue(PersistTimeSeconds + "_sum", EntityName -> "test") should be > 0.0 + + // get command command is unstashed + metricValue(CompletedCommandsTotal, EntityName -> "test") shouldBe 1 + metricValue(UnstashedCommandsTotal, EntityName -> "test") shouldBe 1 + metricValue(CommandStashTimeSeconds + "_count", EntityName -> "test") shouldBe 1 + metricValue(CommandStashTimeSeconds + "_sum", EntityName -> "test") should be > 0.0 + + connection.expect(command(2, "entity", "getCommand")) + + // send reply for get command + connection.send(reply(2, EmptyJavaMessage)) + expectMsg(UserFunctionReply(clientActionReply(messagePayload(EmptyJavaMessage)))) + + metricValue(CompletedCommandsTotal, EntityName -> "test") shouldBe 2 + metricValue(CommandProcessingTimeSeconds + "_count", EntityName -> "test") shouldBe 2 + metricValue(CommandProcessingTimeSeconds + "_sum", EntityName -> "test") should be > 0.0 + metricValue(CommandTotalTimeSeconds + "_count", EntityName -> "test") shouldBe 2 + metricValue(CommandTotalTimeSeconds + "_sum", EntityName -> "test") should be > 0.0 + + // passivate the entity + entity ! ValueEntity.Stop + connection.expectClosed() + expectTerminated(entity) + + metricValue(PassivatedEntitiesTotal, EntityName -> "test") shouldBe 1 + metricValue(EntityActiveTimeSeconds + "_count", EntityName -> "test") shouldBe 1 + metricValue(EntityActiveTimeSeconds + "_sum", EntityName -> "test") should be > 0.0 + + // reactivate the entity + val reactivatedEntity = + system.actorOf(ValueEntitySupervisor.props(client, entityConfiguration, repository), "entity") + watch(reactivatedEntity) + val connection2 = service.valueEntity.expectConnection() + + connection2.expect(init("service", "entity", state(entityState))) + + metricValue(ActivatedEntitiesTotal, EntityName -> "test") shouldBe 2 + metricValue(LoadedStatesTotal, EntityName -> "test") shouldBe 1 + metricValue(LoadedStateBytesTotal, EntityName -> "test") should be > 0.0 + metricValue(RecoveryTimeSeconds + "_count", EntityName -> "test") shouldBe 2 + metricValue(RecoveryTimeSeconds + "_sum", EntityName -> "test") should be > 0.0 + + // send delete command + reactivatedEntity ! EntityCommand(entityId = "test", name = "deleteCommand", emptyCommand) + connection2.expect(command(1, "entity", "deleteCommand")) + connection2.send(reply(1, EmptyJavaMessage, delete())) + expectMsg(UserFunctionReply(clientActionReply(messagePayload(EmptyJavaMessage)))) + + metricValue(ReceivedCommandsTotal, EntityName -> "test") shouldBe 3 + metricValue(CompletedCommandsTotal, EntityName -> "test") shouldBe 3 + metricValue(CommandProcessingTimeSeconds + "_count", EntityName -> "test") shouldBe 3 + metricValue(CommandProcessingTimeSeconds + "_sum", EntityName -> "test") should be > 0.0 + metricValue(CommandTotalTimeSeconds + "_count", EntityName -> "test") shouldBe 3 + metricValue(CommandTotalTimeSeconds + "_sum", EntityName -> "test") should be > 0.0 + metricValue(DeleteTimeSeconds + "_count", EntityName -> "test") shouldBe 1 + metricValue(DeleteTimeSeconds + "_sum", EntityName -> "test") should be > 0.0 + + // send a command that fails + reactivatedEntity ! EntityCommand(entityId = "test", name = "command1", emptyCommand) + connection2.expect(command(2, "entity", "command1")) + connection2.send(actionFailure(2, "failure")) + expectMsg(UserFunctionReply(Some(ClientAction(ClientAction.Action.Failure(Failure(2, "failure")))))) + + metricValue(ReceivedCommandsTotal, EntityName -> "test") shouldBe 4 + metricValue(FailedCommandsTotal, EntityName -> "test") shouldBe 1 + metricValue(CompletedCommandsTotal, EntityName -> "test") shouldBe 4 + metricValue(CommandProcessingTimeSeconds + "_count", EntityName -> "test") shouldBe 4 + metricValue(CommandProcessingTimeSeconds + "_sum", EntityName -> "test") should be > 0.0 + metricValue(CommandTotalTimeSeconds + "_count", EntityName -> "test") shouldBe 4 + metricValue(CommandTotalTimeSeconds + "_sum", EntityName -> "test") should be > 0.0 + + // create unexpected entity failure + reactivatedEntity ! EntityCommand(entityId = "test", name = "command2", emptyCommand) + connection2.expect(command(3, "entity", "command2")) + connection2.send(failure(3, "boom")) + expectMsg(UserFunctionReply(clientActionFailure("Unexpected Value entity failure"))) + connection2.expectClosed() + expectTerminated(reactivatedEntity) + + metricValue(ReceivedCommandsTotal, EntityName -> "test") shouldBe 5 + metricValue(FailedCommandsTotal, EntityName -> "test") shouldBe 2 + metricValue(CompletedCommandsTotal, EntityName -> "test") shouldBe 5 + metricValue(CommandProcessingTimeSeconds + "_count", EntityName -> "test") shouldBe 5 + metricValue(CommandProcessingTimeSeconds + "_sum", EntityName -> "test") should be > 0.0 + metricValue(CommandTotalTimeSeconds + "_count", EntityName -> "test") shouldBe 5 + metricValue(CommandTotalTimeSeconds + "_sum", EntityName -> "test") should be > 0.0 + + metricValue(PassivatedEntitiesTotal, EntityName -> "test") shouldBe 2 + metricValue(EntityActiveTimeSeconds + "_count", EntityName -> "test") shouldBe 2 + metricValue(EntityActiveTimeSeconds + "_sum", EntityName -> "test") should be > 0.0 + } + + } +} diff --git a/proxy/core/src/test/scala/io/cloudstate/proxy/valueentity/DatabaseExceptionHandlingSpec.scala b/proxy/core/src/test/scala/io/cloudstate/proxy/valueentity/DatabaseExceptionHandlingSpec.scala index 32d67f23b..bc07c9d34 100644 --- a/proxy/core/src/test/scala/io/cloudstate/proxy/valueentity/DatabaseExceptionHandlingSpec.scala +++ b/proxy/core/src/test/scala/io/cloudstate/proxy/valueentity/DatabaseExceptionHandlingSpec.scala @@ -24,12 +24,13 @@ import com.google.protobuf.any.{Any => ScalaPbAny} import com.google.protobuf.{ByteString => PbByteString} import io.cloudstate.protocol.value_entity.ValueEntityClient import io.cloudstate.proxy.entity.{EntityCommand, UserFunctionReply} -import io.cloudstate.proxy.telemetry.AbstractTelemetrySpec +import io.cloudstate.proxy.telemetry.{AbstractTelemetrySpec, CloudstateTelemetry, PrometheusEntityInstrumentation} import io.cloudstate.proxy.valueentity.store.Store.Key import io.cloudstate.proxy.valueentity.store.Store.Value import io.cloudstate.proxy.valueentity.store.{RepositoryImpl, Store} import io.cloudstate.testkit.TestService import io.cloudstate.testkit.valueentity.ValueEntityMessages +import io.prometheus.client.CollectorRegistry import scala.concurrent.Future import scala.concurrent.duration._ @@ -55,12 +56,17 @@ class DatabaseExceptionHandlingSpec extends AbstractTelemetrySpec { "ValueEntity" should { - "crash entity on init when loading state failures" in withTestKit(testkitConfig) { testKit => + "crash entity on init when loading state failures" in withTestRegistry(testkitConfig) { testKit => import testKit._ import system.dispatcher + import PrometheusEntityInstrumentation.MetricLabel._ + import PrometheusEntityInstrumentation.MetricName._ silentDeadLettersAndUnhandledMessages + // simulate user function interaction with value-based entity to validate instrumentation + implicit val registry: CollectorRegistry = CloudstateTelemetry(system).prometheusRegistry + val client = ValueEntityClient(GrpcClientSettings.connectToServiceAt("localhost", service.port).withTls(false)) val repository = new RepositoryImpl(TestJdbcStore.storeWithGetFailure()) @@ -68,15 +74,24 @@ class DatabaseExceptionHandlingSpec extends AbstractTelemetrySpec { val connection = service.valueEntity.expectConnection() connection.expectClosed() + + metricValue(RecoveryFailedTotal, EntityName -> "test") shouldBe 1 + metricValue(RecoveryTimeSeconds + "_count", EntityName -> "test") shouldBe 1 + metricValue(RecoveryTimeSeconds + "_sum", EntityName -> "test") should be > 0.0 } - "crash entity on update state failures" in withTestKit(testkitConfig) { testKit => + "crash entity on update state failures" in withTestRegistry(testkitConfig) { testKit => import ValueEntityMessages._ import testKit._ import system.dispatcher + import PrometheusEntityInstrumentation.MetricLabel._ + import PrometheusEntityInstrumentation.MetricName._ silentDeadLettersAndUnhandledMessages + // simulate user function interaction with value-based entity to validate instrumentation + implicit val registry: CollectorRegistry = CloudstateTelemetry(system).prometheusRegistry + val forwardReply = forwardReplyActor(testActor) val client = @@ -94,15 +109,24 @@ class DatabaseExceptionHandlingSpec extends AbstractTelemetrySpec { connection.send(reply(1, EmptyJavaMessage, update(state))) expectMsg(UserFunctionReply(clientActionFailure("Unexpected Value entity failure"))) connection.expectClosed() + + metricValue(PersistFailedTotal, EntityName -> "test") shouldBe 1 + metricValue(PersistTimeSeconds + "_count", EntityName -> "test") shouldBe 1 + metricValue(PersistTimeSeconds + "_sum", EntityName -> "test") should be > 0.0 } - "crash entity on delete state failures" in withTestKit(testkitConfig) { testKit => + "crash entity on delete state failures" in withTestRegistry(testkitConfig) { testKit => import ValueEntityMessages._ import testKit._ import system.dispatcher + import PrometheusEntityInstrumentation.MetricLabel._ + import PrometheusEntityInstrumentation.MetricName._ silentDeadLettersAndUnhandledMessages + // simulate user function interaction with value-based entity to validate instrumentation + implicit val registry: CollectorRegistry = CloudstateTelemetry(system).prometheusRegistry + val forwardReply = forwardReplyActor(testActor) val client = @@ -125,6 +149,10 @@ class DatabaseExceptionHandlingSpec extends AbstractTelemetrySpec { expectMsg(UserFunctionReply(clientActionFailure("Unexpected Value entity failure"))) connection.expectClosed() + + metricValue(DeleteFailedTotal, EntityName -> "test") shouldBe 1 + metricValue(DeleteTimeSeconds + "_count", EntityName -> "test") shouldBe 1 + metricValue(DeleteTimeSeconds + "_sum", EntityName -> "test") should be > 0.0 } } @@ -158,7 +186,6 @@ class DatabaseExceptionHandlingSpec extends AbstractTelemetrySpec { private object TestJdbcStore { private object JdbcStoreStatus { - val normal = "normal" val getFailure = "GetFailure" val updateFailure = "UpdateFailure" val deleteFailure = "DeleteFailure" @@ -177,7 +204,7 @@ class DatabaseExceptionHandlingSpec extends AbstractTelemetrySpec { system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*unhandled message.*"))) } - private def forwardReplyActor(actor: ActorRef)(implicit system: ActorSystem) = + private def forwardReplyActor(actor: ActorRef)(implicit system: ActorSystem): TestActorRef[Actor] = TestActorRef(new Actor { def receive: Receive = { case message => diff --git a/proxy/core/src/test/scala/io/cloudstate/proxy/valueentity/EntityPassivateSpec.scala b/proxy/core/src/test/scala/io/cloudstate/proxy/valueentity/EntityPassivateSpec.scala index ed99dca26..4650bb3e6 100644 --- a/proxy/core/src/test/scala/io/cloudstate/proxy/valueentity/EntityPassivateSpec.scala +++ b/proxy/core/src/test/scala/io/cloudstate/proxy/valueentity/EntityPassivateSpec.scala @@ -35,7 +35,7 @@ class EntityPassivateSpec extends AbstractTelemetrySpec { "ValueEntity" should { - "access the state after passivation" in withTestKit( + "access the state after passivation" in withTestRegistry( """ | include "test-in-memory" | akka { @@ -88,14 +88,12 @@ class EntityPassivateSpec extends AbstractTelemetrySpec { connection.expectClosed() expectTerminated(entity) - // recreate the entity - eventually(timeout(5.seconds), interval(100.millis)) { - val recreatedEntity = - system.actorOf(ValueEntitySupervisor.props(client, entityConfiguration, repository), "entity") - val connection2 = service.valueEntity.expectConnection() - connection2.expect(init("service", "entity", state(entityState))) - connection2.close() - } + // reactivate the entity + val reactivatedEntity = + system.actorOf(ValueEntitySupervisor.props(client, entityConfiguration, repository), "entity") + val connection2 = service.valueEntity.expectConnection() + connection2.expect(init("service", "entity", state(entityState))) + connection2.close() } }