From 7fdac28caa2f27dc6a8effbcae8e2629036170e1 Mon Sep 17 00:00:00 2001 From: XK00LJ Date: Thu, 2 Mar 2023 13:32:03 +0100 Subject: [PATCH 1/2] Made the AkkaBaker use the BakerLogging for logging Baker releated events. Added missing logs & corrected logs that where wrong. --- .../akka/actor/logging/LogAndSendEvent.scala | 54 ++++++++ .../actor/process_index/ProcessIndex.scala | 7 +- .../SensoryEventResponseHandler.scala | 11 +- .../process_instance/ProcessInstance.scala | 9 +- .../ProcessInstanceLogger.scala | 62 ++++----- .../recipe_manager/RecipeManagerActor.scala | 4 +- .../runtime/akka/internal/RecipeRuntime.scala | 19 ++- .../baker/runtime/model/BakerLogging.scala | 126 ++++++++++-------- .../ing/baker/runtime/model/EventStream.scala | 2 +- .../baker/runtime/model/RecipeManager.scala | 5 +- .../model/recipeinstance/RecipeInstance.scala | 14 +- .../recipeinstance/TransitionExecution.scala | 23 ++-- http/baker-http-dashboard/package.json | 2 +- version.sbt | 2 +- 14 files changed, 205 insertions(+), 135 deletions(-) create mode 100644 core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/logging/LogAndSendEvent.scala diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/logging/LogAndSendEvent.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/logging/LogAndSendEvent.scala new file mode 100644 index 000000000..fcc6fd18a --- /dev/null +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/logging/LogAndSendEvent.scala @@ -0,0 +1,54 @@ +package com.ing.baker.runtime.akka.actor.logging + +import akka.event.EventStream +import com.ing.baker.il.petrinet.Transition +import com.ing.baker.runtime.common.{EventReceived, EventRejected, InteractionCompleted, InteractionFailed, InteractionStarted, RecipeAdded, RecipeInstanceCreated} +import com.ing.baker.runtime.model.BakerLogging + +object LogAndSendEvent { + + val bakerLogging: BakerLogging = BakerLogging.default + //TODO get this on startup instead of requiring for each call + //val eventStream: EventStream = context.system.eventStream + + def recipeAdded(recipeAdded: RecipeAdded, eventStream: EventStream): Unit = { + eventStream.publish(recipeAdded) + bakerLogging.addedRecipe(recipeAdded) + } + + def recipeInstanceCreated(recipeInstanceCreated: RecipeInstanceCreated, eventStream: EventStream): Unit = { + eventStream.publish(recipeInstanceCreated) + bakerLogging.recipeInstanceCreated(recipeInstanceCreated) + } + + def interactionStarted(interactionStarted: InteractionStarted, eventStream: EventStream): Unit = { + eventStream.publish(interactionStarted) + bakerLogging.interactionStarted(interactionStarted) + } + + def interactionCompleted(interactionCompleted: InteractionCompleted, eventStream: EventStream): Unit = { + eventStream.publish(interactionCompleted) + bakerLogging.interactionFinished(interactionCompleted) + } + + def interactionFailed(interactionFailed: InteractionFailed, eventStream: EventStream): Unit = { + eventStream.publish(interactionFailed) + bakerLogging.interactionFailed(interactionFailed) + } + + def eventReceived(eventReceived: EventReceived, eventStream: EventStream): Unit = { + eventStream.publish(eventReceived) + bakerLogging.eventReceived(eventReceived) + } + + def eventRejected(eventRejected: EventRejected, eventStream: EventStream): Unit = { + eventStream.publish(eventRejected) + bakerLogging.eventRejected(eventRejected) + } + + def firingEvent(recipeInstanceId: String, executionId: Long, transition: Transition, timeStarted: Long): Unit = { + //TODO This does not have a corrosponding BakerEvent, this should be created + bakerLogging.firingEvent(recipeInstanceId, executionId, transition, timeStarted) + } + +} diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_index/ProcessIndex.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_index/ProcessIndex.scala index 0333bf747..f39bd83df 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_index/ProcessIndex.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_index/ProcessIndex.scala @@ -14,6 +14,7 @@ import com.ing.baker.il.{CompiledRecipe, EventDescriptor} import com.ing.baker.petrinet.api._ import com.ing.baker.runtime.akka._ import com.ing.baker.runtime.akka.actor.Util.logging._ +import com.ing.baker.runtime.akka.actor.logging.LogAndSendEvent import com.ing.baker.runtime.akka.actor.process_index.ProcessIndex._ import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexProtocol._ import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol.ExceptionStrategy.{BlockTransition, Continue, RetryWithDelay} @@ -23,7 +24,7 @@ import com.ing.baker.runtime.akka.actor.recipe_manager.RecipeManagerProtocol._ import com.ing.baker.runtime.akka.actor.serialization.BakerSerializable import com.ing.baker.runtime.akka.internal.RecipeRuntime import com.ing.baker.runtime.common.RecipeRecord -import com.ing.baker.runtime.model.InteractionManager +import com.ing.baker.runtime.model.{BakerLogging, InteractionManager} import com.ing.baker.runtime.recipe_manager.RecipeManager import com.ing.baker.runtime.scaladsl.{EventInstance, RecipeInstanceCreated, RecipeInstanceState} import com.ing.baker.runtime.serialization.Encryption @@ -295,7 +296,9 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration], val actorMetadata = ActorMetadata(recipeId, recipeInstanceId, createdTime, Active) - context.system.eventStream.publish(RecipeInstanceCreated(System.currentTimeMillis(), recipeId, compiledRecipe.name, recipeInstanceId)) + LogAndSendEvent.recipeInstanceCreated( + RecipeInstanceCreated(System.currentTimeMillis(), recipeId, compiledRecipe.name, recipeInstanceId), + context.system.eventStream) index += recipeInstanceId -> actorMetadata } diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_index/SensoryEventResponseHandler.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_index/SensoryEventResponseHandler.scala index e70c99860..4cee68381 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_index/SensoryEventResponseHandler.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_index/SensoryEventResponseHandler.scala @@ -3,6 +3,7 @@ package com.ing.baker.runtime.akka.actor.process_index import akka.actor.{Actor, ActorLogging, ActorRef, Props, ReceiveTimeout} import akka.sensors.actor.ActorMetrics import com.ing.baker.il.CompiledRecipe +import com.ing.baker.runtime.akka.actor.logging.LogAndSendEvent import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexProtocol._ import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol._ @@ -35,14 +36,15 @@ class SensoryEventResponseHandler(receiver: ActorRef, command: ProcessEvent, ing } def notifyReceive(recipe: CompiledRecipe): Unit = { - context.system.eventStream.publish( + LogAndSendEvent.eventReceived( EventReceived( System.currentTimeMillis(), recipe.name, recipe.recipeId, command.recipeInstanceId, command.correlationId, - command.event)) + command.event), context.system.eventStream) + command.reaction match { case FireSensoryEventReaction.NotifyWhenCompleted(_) => () @@ -87,13 +89,14 @@ class SensoryEventResponseHandler(receiver: ActorRef, command: ProcessEvent, ing log.debug("Stopping SensoryEventResponseHandler and rejecting request") log.debug("Reject reason: " + rejection.asReason) log.debug("message: " + rejection) - context.system.eventStream.publish( + LogAndSendEvent.eventRejected( EventRejected( System.currentTimeMillis(), command.recipeInstanceId, command.correlationId, command.event, - rejection.asReason)) + rejection.asReason), context.system.eventStream) + command.reaction match { case FireSensoryEventReaction.NotifyBoth(_, completeReceiver) => receiver ! rejection; completeReceiver ! rejection diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstance.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstance.scala index 7d0c0904f..4fbafb89e 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstance.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstance.scala @@ -5,7 +5,7 @@ import akka.cluster.sharding.ShardRegion.Passivate import akka.event.{DiagnosticLoggingAdapter, Logging} import akka.persistence.{DeleteMessagesFailure, DeleteMessagesSuccess} import cats.effect.IO -import com.ing.baker.il.petrinet.Transition +import com.ing.baker.il.petrinet.{EventTransition, Transition} import com.ing.baker.petrinet.api._ import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexProtocol.FireSensoryEventRejection import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstance._ @@ -15,6 +15,7 @@ import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol import com.ing.baker.runtime.akka.actor.process_instance.internal.ExceptionStrategy.{Continue, RetryWithDelay} import com.ing.baker.runtime.akka.actor.process_instance.internal._ import com.ing.baker.runtime.akka.actor.process_instance.{ProcessInstanceProtocol => protocol} +import com.ing.baker.runtime.model.BakerLogging import com.ing.baker.runtime.scaladsl.RecipeInstanceState import com.ing.baker.runtime.serialization.Encryption import com.ing.baker.types.PrimitiveValue @@ -426,7 +427,11 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E]( def executeJob(job: Job[P, T, S], originalSender: ActorRef): Unit = { - log.firingInteraction(recipeInstanceId, job.id, job.transition.asInstanceOf[Transition], System.currentTimeMillis()) + log.fireTransition(recipeInstanceId, job.id, job.transition.asInstanceOf[Transition], System.currentTimeMillis()) + + if(job.transition.isInstanceOf[EventTransition]) { + BakerLogging.default.firingEvent(recipeInstanceId, job.id, job.transition.asInstanceOf[Transition], System.currentTimeMillis()) + } // context.self can be potentially throw NullPointerException in non graceful shutdown situations Try(context.self).foreach { self => diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceLogger.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceLogger.scala index 7a3c6e246..8072a9eee 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceLogger.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceLogger.scala @@ -3,6 +3,7 @@ package com.ing.baker.runtime.akka.actor.process_instance import akka.event.{DiagnosticLoggingAdapter, Logging} import com.ing.baker.il.petrinet.Transition import com.ing.baker.runtime.akka.actor.Util.logging._ +import com.ing.baker.runtime.model.BakerLogging import scala.concurrent.duration._ @@ -25,6 +26,8 @@ object ProcessInstanceLogger { implicit class LoggingAdapterFns(log: DiagnosticLoggingAdapter) { + val bakerLogging: BakerLogging = BakerLogging.default + def processHistoryDeletionSuccessful(recipeInstanceId: String, toSequenceNr: Long) = { val msg = s"Process history successfully deleted (up to event sequence $toSequenceNr), stopping the actor" @@ -48,7 +51,24 @@ object ProcessInstanceLogger { log.errorWithMDC(msg, mdc, cause) } - def transitionFired(recipeInstanceId: String, transition: Transition, jobId: Long, timeStarted: Long, timeCompleted: Long) = { + def fireTransition(recipeInstanceId: String, jobId: Long, transition: Transition, timeStarted: Long): Unit = { + val mdc = Map( + "processEvent" -> "FiringTransition", + "processId" -> recipeInstanceId, + "recipeInstanceId" -> recipeInstanceId, + "jobId" -> jobId, + "transitionId" -> transition.label, + "timeStarted" -> timeStarted + ) + val msg = s"Firing transition ${transition.label}" + log.logWithMDC(Logging.InfoLevel, msg, mdc) + } + + def transitionFired(recipeInstanceId: String, + transition: Transition, + jobId: Long, + timeStarted: Long, + timeCompleted: Long) = { val mdc = Map( "processEvent" -> "TransitionFired", @@ -75,6 +95,7 @@ object ProcessInstanceLogger { "processEvent" -> "TransitionFailed", "processId" -> recipeInstanceId, "recipeInstanceId" -> recipeInstanceId, + "jobId" -> jobId, "timeStarted" -> timeStarted, "timeFailed" -> timeFailed, "duration" -> (timeFailed - timeStarted), @@ -86,30 +107,6 @@ object ProcessInstanceLogger { log.logWithMDC(Logging.ErrorLevel, msg, mdc) } - def firingInteraction(recipeInstanceId: String, jobId: Long, transition: Transition, timeStarted: Long): Unit = { - val mdc = Map( - "processEvent" -> "FiringTransition", - "recipeInstanceEvent" -> "FiringInteraction", - "processId" -> recipeInstanceId, - "recipeInstanceId" -> recipeInstanceId, - "jobId" -> jobId, - "transitionId" -> transition.label, - "timeStarted" -> timeStarted - ) - val msg = s"Firing interaction ${transition.label}" - log.logWithMDC(Logging.InfoLevel, msg, mdc) - } - - def idleStop(recipeInstanceId: String, idleTTL: FiniteDuration): Unit = { - val mdc = Map( - "recipeInstanceId" -> recipeInstanceId, - "processId" -> recipeInstanceId, - ) - val msg = s"Instance was idle for $idleTTL, stopping the actor" - - log.logWithMDC(Logging.InfoLevel, msg, mdc) - } - def fireTransitionRejected(recipeInstanceId: String, transition: Transition, rejectReason: String): Unit = { val mdc = Map( "processEvent" -> "FireTransitionRejected", @@ -124,17 +121,12 @@ object ProcessInstanceLogger { log.logWithMDC(Logging.WarningLevel, msg, mdc) } - def scheduleRetry(recipeInstanceId: String, transition: Transition, delay: Long): Unit = { - val mdc = Map( - "processEvent" -> "TransitionRetry", - "recipeInstanceEvent" -> "InteractionRetry", - "processId" -> recipeInstanceId, - "recipeInstanceId" -> recipeInstanceId, - "transitionId" -> transition.label) - - val msg = s"Scheduling a retry of interaction '${transition.label}' in ${durationFormatter.print(new org.joda.time.Period(delay))}" + def idleStop(recipeInstanceId: String, idleTTL: FiniteDuration): Unit = { + bakerLogging.idleStop(recipeInstanceId, idleTTL) + } - log.logWithMDC(Logging.InfoLevel, msg, mdc) + def scheduleRetry(recipeInstanceId: String, transition: Transition, delay: Long): Unit = { + bakerLogging.scheduleRetry(recipeInstanceId, transition, delay) } } diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/recipe_manager/RecipeManagerActor.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/recipe_manager/RecipeManagerActor.scala index 697871771..831f56dda 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/recipe_manager/RecipeManagerActor.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/recipe_manager/RecipeManagerActor.scala @@ -3,6 +3,7 @@ package com.ing.baker.runtime.akka.actor.recipe_manager import akka.actor.{ActorLogging, Props} import akka.persistence.PersistentActor import com.ing.baker.il.CompiledRecipe +import com.ing.baker.runtime.akka.actor.logging.LogAndSendEvent import com.ing.baker.runtime.akka.actor.recipe_manager.RecipeManagerActor._ import com.ing.baker.runtime.akka.actor.recipe_manager.RecipeManagerProtocol._ import com.ing.baker.runtime.akka.actor.serialization.BakerSerializable @@ -36,8 +37,7 @@ class RecipeManagerActor extends PersistentActor with ActorLogging { val timestamp = System.currentTimeMillis() persist(RecipeAdded(compiledRecipe, timestamp)) { _ => addRecipe(compiledRecipe, timestamp) - context.system.eventStream.publish( - com.ing.baker.runtime.scaladsl.RecipeAdded(compiledRecipe.name, compiledRecipe.recipeId, timestamp, compiledRecipe)) + LogAndSendEvent.recipeAdded(com.ing.baker.runtime.scaladsl.RecipeAdded(compiledRecipe.name, compiledRecipe.recipeId, timestamp, compiledRecipe), context.system.eventStream) sender() ! AddRecipeResponse(compiledRecipe.recipeId) } } diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/internal/RecipeRuntime.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/internal/RecipeRuntime.scala index eda1f2ce7..7cd97d9e1 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/internal/RecipeRuntime.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/internal/RecipeRuntime.scala @@ -1,7 +1,6 @@ package com.ing.baker.runtime.akka.internal import java.lang.reflect.InvocationTargetException - import akka.event.EventStream import cats.effect.{ContextShift, IO} import com.ing.baker.il @@ -9,6 +8,7 @@ import com.ing.baker.il.failurestrategy.ExceptionStrategyOutcome import com.ing.baker.il.petrinet._ import com.ing.baker.il.{CompiledRecipe, IngredientDescriptor} import com.ing.baker.petrinet.api._ +import com.ing.baker.runtime.akka.actor.logging.LogAndSendEvent import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceRuntime import com.ing.baker.runtime.akka.actor.process_instance.internal.ExceptionStrategy.{BlockTransition, Continue, RetryWithDelay} import com.ing.baker.runtime.akka.actor.process_instance.internal._ @@ -159,9 +159,8 @@ class RecipeRuntime(recipe: CompiledRecipe, interactionManager: InteractionManag val currentTime = System.currentTimeMillis() - eventStream.publish( - InteractionFailed(currentTime, currentTime - startTime, recipe.name, recipe.recipeId, - job.processState.recipeInstanceId, job.transition.label, failureCount, throwable, failureStrategyOutcome)) + LogAndSendEvent.interactionFailed(InteractionFailed(currentTime, currentTime - startTime, recipe.name, recipe.recipeId, + job.processState.recipeInstanceId, job.transition.label, failureCount, throwable, failureStrategyOutcome), eventStream) // translates the recipe failure strategy to a petri net failure strategy failureStrategyOutcome match { @@ -190,19 +189,18 @@ class RecipeRuntime(recipe: CompiledRecipe, interactionManager: InteractionManag // returns a delayed task that will get executed by the process instance // add MDC values for logging - MDC.put("RecipeInstanceId", processState.recipeInstanceId) + MDC.put("recipeInstanceId", processState.recipeInstanceId) + MDC.put("recipeId", recipe.recipeId) MDC.put("recipeName", recipe.name) try { - - // create the interaction input val input = createInteractionInput(interaction, processState) val timeStarted = System.currentTimeMillis() // publish the fact that we started the interaction - eventStream.publish(InteractionStarted(timeStarted, recipe.name, recipe.recipeId, processState.recipeInstanceId, interaction.interactionName)) + LogAndSendEvent.interactionStarted(InteractionStarted(timeStarted, recipe.name, recipe.recipeId, processState.recipeInstanceId, interaction.interactionName), eventStream) // executes the interaction and obtain the (optional) output event interactionManager.execute(interaction, input, @@ -220,7 +218,7 @@ class RecipeRuntime(recipe: CompiledRecipe, interactionManager: InteractionManag val timeCompleted = System.currentTimeMillis() // publish the fact that the interaction completed - eventStream.publish(InteractionCompleted(timeCompleted, timeCompleted - timeStarted, recipe.name, recipe.recipeId, processState.recipeInstanceId, interaction.interactionName, outputEvent)) + LogAndSendEvent.interactionCompleted(InteractionCompleted(timeCompleted, timeCompleted - timeStarted, recipe.name, recipe.recipeId, processState.recipeInstanceId, interaction.interactionName, outputEvent), eventStream) // create the output marking for the petri net val outputMarking: Marking[Place] = RecipeRuntime.createProducedMarking(outAdjacent, outputEvent) @@ -230,7 +228,8 @@ class RecipeRuntime(recipe: CompiledRecipe, interactionManager: InteractionManag } finally { // remove the MDC values - MDC.remove("RecipeInstanceId") + MDC.remove("recipeInstanceId") + MDC.remove("recipeId") MDC.remove("recipeName") } diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/BakerLogging.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/BakerLogging.scala index b4a95e44e..e28ba7560 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/BakerLogging.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/BakerLogging.scala @@ -1,9 +1,7 @@ package com.ing.baker.runtime.model -import cats.effect.Sync -import com.ing.baker.il.CompiledRecipe import com.ing.baker.il.petrinet.Transition -import com.ing.baker.runtime.scaladsl.EventInstance +import com.ing.baker.runtime.common._ import com.typesafe.scalalogging.Logger import org.joda.time.Period import org.joda.time.format.{PeriodFormatter, PeriodFormatterBuilder} @@ -12,8 +10,9 @@ import org.slf4j.MDC import scala.concurrent.duration.FiniteDuration object BakerLogging { - lazy val defaultLogger: Logger = Logger("com.ing.baker") + + lazy val default: BakerLogging = BakerLogging() } case class BakerLogging(logger: Logger = BakerLogging.defaultLogger) { @@ -31,99 +30,104 @@ case class BakerLogging(logger: Logger = BakerLogging.defaultLogger) { .appendSeparator(" ") .toFormatter - private def withMDC[F[_]](mdc: Map[String, String], log: Logger => Unit)(implicit effect: Sync[F]): F[Unit] = - effect.delay { + private def withMDC(mdc: Map[String, String], log: Logger => Unit): Unit = { mdc.foreach { case (k, v) => MDC.put(k, v) } log(logger) mdc.keys.foreach(MDC.remove) } - def addedRecipe[F[_]](recipe: CompiledRecipe, timestamp: Long)(implicit effect: Sync[F]): F[Unit] = { - val msg = s"Added recipe '${recipe.name}'" + def addedRecipe(recipeAdded: RecipeAdded): Unit = { + val msg = s"Added recipe '${recipeAdded.recipeName}'" val MDC = Map( - "recipeName" -> recipe.name, - "recipeId" -> recipe.recipeId, - "addedOn" -> timestamp.toString + "recipeName" -> recipeAdded.recipeName, + "recipeId" -> recipeAdded.recipeId, + "addedOn" -> recipeAdded.date.toString ) withMDC(MDC, _.info(msg)) } - def recipeInstanceCreated[F[_]](recipeInstanceId: String, createdOn: Long, recipe: CompiledRecipe)(implicit effect: Sync[F]): F[Unit] = { - val msg = s"Baked recipe instance '$recipeInstanceId' from recipe '${recipe.name}'" + def recipeInstanceCreated(recipeInstanceCreated: RecipeInstanceCreated): Unit = { + val msg = s"Baked recipe instance '${recipeInstanceCreated.recipeInstanceId}' from recipe '${recipeInstanceCreated.recipeName} : ${recipeInstanceCreated.recipeId}'" val MDC = Map( - "recipeInstanceId" -> recipeInstanceId, - "createdOn" -> createdOn.toString, - "recipeName" -> recipe.name, - "recipeId" -> recipe.recipeId + "recipeInstanceId" -> recipeInstanceCreated.recipeInstanceId, + "createdOn" -> recipeInstanceCreated.timeStamp.toString, + "recipeName" -> recipeInstanceCreated.recipeName, + "recipeId" -> recipeInstanceCreated.recipeId ) withMDC(MDC, _.info(msg)) } - def firingEvent[F[_]](recipeInstanceId: String, executionId: Long, transition: Transition, timeStarted: Long)(implicit effect: Sync[F]): F[Unit] = { - val msg = s"Firing event '${transition.label}'" + def interactionStarted(interactionStarted: InteractionStarted): Unit = { + val msg = s"Interaction started '${interactionStarted.interactionName}'" val mdc = Map( - "recipeInstanceId" -> recipeInstanceId, - "eventName" -> transition.label, - "runtimeTimestamp" -> timeStarted.toString, - "executionId" -> executionId.toString + "recipeInstanceId" -> interactionStarted.recipeInstanceId, + "interactionName" -> interactionStarted.interactionName, + "timeStarted" -> interactionStarted.timeStamp.toString, + "recipeId" -> interactionStarted.recipeId, + "recipeName" -> interactionStarted.recipeName ) withMDC(mdc, _.info(msg)) } - def interactionStarted[F[_]](recipeInstanceId: String, executionId: Long, transition: Transition, timeStarted: Long)(implicit effect: Sync[F]): F[Unit] = { - val msg = s"Interaction started '${transition.label}'" + def interactionFinished(interactionCompleted: InteractionCompleted): Unit = { + val msg = s"Interaction finished '${interactionCompleted.interactionName}'" val mdc = Map( - "recipeInstanceId" -> recipeInstanceId, - "interactionName" -> transition.label, - "timeStarted" -> timeStarted.toString, - "executionId" -> executionId.toString + "recipeInstanceId" -> interactionCompleted.recipeInstanceId, + "interactionName" -> interactionCompleted.interactionName, + "duration" -> interactionCompleted.duration.toString, + "timeFinished" -> interactionCompleted.timeStamp.toString ) withMDC(mdc, _.info(msg)) } - def interactionFinished[F[_]](recipeInstanceId: String, executionId: Long, transition: Transition, timeStarted: Long, timeFinished: Long)(implicit effect: Sync[F]): F[Unit] = { - val msg = s"Interaction finished '${transition.label}'" + def interactionFailed(interactionFailed: InteractionFailed): Unit = { + val msg = s"Interaction failed '${interactionFailed.interactionName}'" val mdc = Map( - "recipeInstanceId" -> recipeInstanceId, - "interactionName" -> transition.label, - "duration" -> (timeFinished - timeStarted).toString, - "timeStarted" -> timeStarted.toString, - "timeFinished" -> timeFinished.toString, - "executionId" -> executionId.toString + "recipeInstanceId" -> interactionFailed.recipeInstanceId, + "interactionName" -> interactionFailed.interactionName, + "duration" -> interactionFailed.duration.toString, + "timeFailed" -> interactionFailed.timeStamp.toString, + "recipeId" -> interactionFailed.recipeId, + "recipeName" -> interactionFailed.recipeName, + "failureCount" -> interactionFailed.failureCount.toString ) - withMDC(mdc, _.info(msg)) + withMDC(mdc, _.error(msg, interactionFailed.throwable)) } - def interactionFailed[F[_]](recipeInstanceId: String, transition: Transition, executionId: Long, timeStarted: Long, timeFailed: Long, failureReason: Throwable)(implicit effect: Sync[F]): F[Unit] = { - val msg = s"Interaction failed '${transition.label}'" + def firingEvent(recipeInstanceId: String, executionId: Long, transition: Transition, timeStarted: Long): Unit = { + val msg = s"Firing event '${transition.label}'" val mdc = Map( "recipeInstanceId" -> recipeInstanceId, - "interactionName" -> transition.label, - "duration" -> (timeFailed - timeStarted).toString, - "timeStarted" -> timeStarted.toString, - "timeFailed" -> timeFailed.toString, - "executionId" -> executionId.toString, + "eventName" -> transition.label, + "runtimeTimestamp" -> timeStarted.toString, + "executionId" -> executionId.toString ) - withMDC(mdc, _.error(msg, failureReason)) + withMDC(mdc, _.info(msg)) } - def idleStop[F[_]](recipeInstanceId: String, idleTTL: FiniteDuration)(implicit effect: Sync[F]): F[Unit] = { - val msg = s"Instance was idle for $idleTTL" - val mdc = Map("recipeInstanceId" -> recipeInstanceId) - withMDC(mdc, _.info(msg)) + def eventReceived(eventReceived: EventReceived): Unit = { + val msg = s"Event received '${eventReceived.event.name}'" + val mdc = Map( + "event" -> eventReceived.event.name, + "recipeInstanceId" -> eventReceived.recipeInstanceId, + "recipeId" -> eventReceived.recipeId, + "recipeName" -> eventReceived.recipeName, + "timeReceived" -> eventReceived.timeStamp.toString, + ) + withMDC(mdc, _.warn(msg)) } - def eventRejected[F[_]](recipeInstanceId: String, event: EventInstance, rejectReason: String)(implicit effect: Sync[F]): F[Unit] = { - val msg = s"Event rejected '${event.name}' because: $rejectReason" + def eventRejected(eventRejected: EventRejected): Unit = { + val msg = s"Event rejected '${eventRejected.event.name}' because: ${eventRejected.reason}" val mdc = Map( - "recipeInstanceId" -> recipeInstanceId, - "event" -> event.name, - "rejectReason" -> rejectReason + "event" -> eventRejected.event.name, + "recipeInstanceId" -> eventRejected.recipeInstanceId, + "timeReceived" -> eventRejected.timeStamp.toString, ) withMDC(mdc, _.warn(msg)) } - def scheduleRetry[F[_]](recipeInstanceId: String, transition: Transition, delay: Long)(implicit effect: Sync[F]): F[Unit] = { + def scheduleRetry(recipeInstanceId: String, transition: Transition, delay: Long): Unit = { val msg = s"Scheduling a retry of interaction '${transition.label}' in ${durationFormatter.print(new Period(delay))}" val mdc = Map( "recipeInstanceId" -> recipeInstanceId, @@ -133,6 +137,12 @@ case class BakerLogging(logger: Logger = BakerLogging.defaultLogger) { withMDC(mdc, _.info(msg)) } - def exceptionOnEventListener[F[_]](throwable: Throwable)(implicit effect: Sync[F]): F[Unit] = - effect.delay(logger.error("Exception on event listener", throwable)) + def idleStop(recipeInstanceId: String, idleTTL: FiniteDuration): Unit = { + val msg = s"Instance was idle for $idleTTL" + val mdc = Map("recipeInstanceId" -> recipeInstanceId) + withMDC(mdc, _.info(msg)) + } + + def exceptionOnEventListener(throwable: Throwable): Unit = + logger.error("Exception on event listener", throwable) } diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/EventStream.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/EventStream.scala index 82e63c9a8..4b784aaa3 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/EventStream.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/EventStream.scala @@ -16,7 +16,7 @@ trait EventStream[F[_]] { _ <- listeners.traverse { listener => effect.runAsync(effect.delay(listener(event))) { case Right(_) => IO.unit - case Left(e) => effect.toIO(components.logging.exceptionOnEventListener(e)) + case Left(e) => effect.toIO(effect.delay(components.logging.exceptionOnEventListener(e))) } }.to[F] } yield () diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/RecipeManager.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/RecipeManager.scala index 0836963a1..582f867d5 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/RecipeManager.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/RecipeManager.scala @@ -38,8 +38,9 @@ trait RecipeManager[F[_]] extends LazyLogging { for { timestamp <- timer.clock.realTime(duration.MILLISECONDS) _ <- store(compiledRecipe, timestamp) - _ <- components.logging.addedRecipe(compiledRecipe, timestamp) - _ <- components.eventStream.publish(RecipeAdded(compiledRecipe.name, compiledRecipe.recipeId, timestamp, compiledRecipe)) + event = RecipeAdded(compiledRecipe.name, compiledRecipe.recipeId, timestamp, compiledRecipe) + _ <- effect.delay(components.logging.addedRecipe(event)) + _ <- components.eventStream.publish(event) } yield () } yield compiledRecipe.recipeId diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstance.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstance.scala index 2d2259449..915502b91 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstance.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstance.scala @@ -24,8 +24,9 @@ object RecipeInstance { for { timestamp <- timer.clock.realTime(MILLISECONDS) state <- Ref.of[F, RecipeInstanceState](RecipeInstanceState.empty(recipeInstanceId, recipe, timestamp)) - _ <- components.logging.recipeInstanceCreated(recipeInstanceId, timestamp, recipe) - _ <- components.eventStream.publish(RecipeInstanceCreated(timestamp, recipe.recipeId, recipe.name, recipeInstanceId)) + recipeInstanceCreated = RecipeInstanceCreated(timestamp, recipe.recipeId, recipe.name, recipeInstanceId) + _ <- effect.delay(components.logging.recipeInstanceCreated(recipeInstanceCreated)) + _ <- components.eventStream.publish(recipeInstanceCreated) } yield RecipeInstance(recipeInstanceId, settings, state) class FatalInteractionException(message: String, cause: Throwable = null) extends RuntimeException(message, cause) @@ -40,8 +41,9 @@ case class RecipeInstance[F[_]](recipeInstanceId: String, config: RecipeInstance initialExecution <- EitherT.fromEither[F](currentState.validateExecution(input, correlationId, currentTime)) .leftSemiflatMap { case (rejection, reason) => for { - _ <- components.logging.eventRejected(recipeInstanceId, input, reason) - _ <- components.eventStream.publish(EventRejected(currentTime, recipeInstanceId, correlationId, input, rejection.asReason)) + event <- effect.delay(EventRejected(currentTime, recipeInstanceId, correlationId, input, rejection.asReason)) + _ <- effect.delay(components.logging.eventRejected(event)) + _ <- components.eventStream.publish(event) } yield rejection } _ <- EitherT.liftF(components.eventStream.publish(EventReceived(currentTime, currentState.recipe.name, currentState.recipe.recipeId, recipeInstanceId, correlationId, input))) @@ -116,7 +118,7 @@ case class RecipeInstance[F[_]](recipeInstanceId: String, config: RecipeInstance _ <- state.update(_ .recordFailedExecution(finishedExecution, strategy) .addRetryingExecution(finishedExecution.id)) - _ <- components.logging.scheduleRetry(recipeInstanceId, finishedExecution.transition, delay) + _ <- effect.delay(components.logging.scheduleRetry(recipeInstanceId, finishedExecution.transition, delay)) finalOutcome <- timer.sleep(delay.milliseconds) *> state.get.flatMap { currentState => if (currentState.retryingExecutions.contains(finishedExecution.id)) { val currentTransitionExecution = currentState.executions(finishedExecution.id) @@ -145,7 +147,7 @@ case class RecipeInstance[F[_]](recipeInstanceId: String, config: RecipeInstance state.get.flatMap { currentState => if (currentState.isInactive && currentState.sequenceNumber == sequenceNumber) components.recipeInstanceManager.idleStop(recipeInstanceId) *> - components.logging.idleStop(recipeInstanceId, originalIdleTTL) + effect.delay(components.logging.idleStop(recipeInstanceId, originalIdleTTL)) else effect.unit } diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/TransitionExecution.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/TransitionExecution.scala index af5443705..f3cbc2cc9 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/TransitionExecution.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/TransitionExecution.scala @@ -100,7 +100,7 @@ private[recipeinstance] case class TransitionExecution( case _: EventTransition => for { timerstamp <- timer.clock.realTime(MILLISECONDS) - _ <- components.logging.firingEvent(recipeInstanceId, id, transition, timerstamp) + _ <- effect.delay(components.logging.firingEvent(recipeInstanceId, id, transition, timerstamp)) } yield input case _ => effect.pure(None) @@ -155,20 +155,20 @@ private[recipeinstance] case class TransitionExecution( outcome <- { for { - _ <- components.logging.interactionStarted(recipeInstanceId, id, transition, startTime) - _ <- components.eventStream.publish(InteractionStarted( - startTime, recipe.name, recipe.recipeId, - recipeInstanceId, interactionTransition.interactionName)) + event <- effect.delay(InteractionStarted(startTime, recipe.name, recipe.recipeId, recipeInstanceId, interactionTransition.interactionName)) + _ <- effect.delay(components.logging.interactionStarted(event)) + _ <- components.eventStream.publish(event) interactionOutput <- effect.bracket(setupMdc)(_ => execute)(_ => cleanMdc) _ <- validateInteractionOutput(interactionTransition, interactionOutput) transformedOutput = interactionOutput.map(_.transformWith(interactionTransition)) endTime <- timer.clock.realTime(MILLISECONDS) - _ <- components.logging.interactionFinished(recipeInstanceId, id, transition, startTime, endTime) - _ <- components.eventStream.publish(InteractionCompleted( + event = InteractionCompleted( endTime, endTime - startTime, recipe.name, recipe.recipeId, recipeInstanceId, - interactionTransition.interactionName, transformedOutput)) + interactionTransition.interactionName, transformedOutput) + _ <- effect.delay(components.logging.interactionFinished(event)) + _ <- components.eventStream.publish(event) } yield transformedOutput }.onError { case e: Throwable => @@ -179,10 +179,11 @@ private[recipeinstance] case class TransitionExecution( } for { endTime <- timer.clock.realTime(MILLISECONDS) - _ <- components.logging.interactionFailed(recipeInstanceId, transition, id, startTime, endTime, throwable) - _ <- components.eventStream.publish(InteractionFailed( + event = InteractionFailed( endTime, endTime - startTime, recipe.name, recipe.recipeId, recipeInstanceId, - transition.label, failureCount, throwable, interactionTransition.failureStrategy.apply(failureCount + 1))) + transition.label, failureCount, throwable, interactionTransition.failureStrategy.apply(failureCount + 1)) + _ <- effect.delay(components.logging.interactionFailed(event)) + _ <- components.eventStream.publish(event) } yield () } diff --git a/http/baker-http-dashboard/package.json b/http/baker-http-dashboard/package.json index c43d69886..2971c2535 100644 --- a/http/baker-http-dashboard/package.json +++ b/http/baker-http-dashboard/package.json @@ -36,7 +36,7 @@ "@angular-eslint/eslint-plugin-template": "14.0.2", "@angular-eslint/schematics": "14.0.2", "@angular-eslint/template-parser": "14.0.2", - "@angular/cli": "^14.2.9", + "@angular/cli": "^14.2.10", "@angular/compiler-cli": "^14.0.6", "@angular/language-service": "^14.0.6", "@angular/localize": "^14.0.6", diff --git a/version.sbt b/version.sbt index 6a4d7b465..ee2988545 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -ThisBuild / version := "3.7.1-SNAPSHOT" +ThisBuild / version := "3.7.2-SNAPSHOT" From 6aeddf0916979b0bcb7e8a329f1aa5effc3ba41b Mon Sep 17 00:00:00 2001 From: XK00LJ Date: Tue, 7 Mar 2023 09:22:21 +0100 Subject: [PATCH 2/2] Added the MetaData object also to the execute of the InteractionInstance. This can then be used inside of the implementation as a context for logging or similar use cases. --- .../interaction/RemoteInteractionClient.scala | 6 ++--- .../RemoteInteractionService.scala | 22 +++++++++++-------- .../interaction/RemoteInteractionSpec.scala | 8 +++---- .../components/InteractionRegistry.scala | 19 ++++++++++------ .../ing/baker/runtime/akka/AkkaBaker.scala | 2 +- .../runtime/common/InteractionInstance.scala | 2 +- .../runtime/javadsl/InteractionInstance.scala | 18 ++++++++++----- .../com/ing/baker/runtime/model/BakerF.scala | 2 +- .../baker/runtime/model/BakerLogging.scala | 2 +- .../runtime/model/InteractionInstance.scala | 3 ++- .../runtime/model/InteractionManager.scala | 2 +- .../recipeinstance/TransitionExecution.scala | 4 ++-- .../serialization/InteractionExecution.scala | 2 +- .../http/client/scaladsl/BakerClient.scala | 2 +- 14 files changed, 55 insertions(+), 39 deletions(-) diff --git a/bakery/interaction-protocol/src/main/scala/com/ing/bakery/interaction/RemoteInteractionClient.scala b/bakery/interaction-protocol/src/main/scala/com/ing/bakery/interaction/RemoteInteractionClient.scala index b5983c8f6..cd4d42c4d 100644 --- a/bakery/interaction-protocol/src/main/scala/com/ing/bakery/interaction/RemoteInteractionClient.scala +++ b/bakery/interaction-protocol/src/main/scala/com/ing/bakery/interaction/RemoteInteractionClient.scala @@ -35,7 +35,7 @@ trait RemoteInteractionClient { def entityCodecs: (EntityEncoder[IO, ExecutionRequest], EntityDecoder[IO, ExecutionResult], EntityDecoder[IO, Interactions]) - def execute(uri: Uri, interactionId: String, input: Seq[IngredientInstance]): IO[Option[EventInstance]] + def execute(uri: Uri, interactionId: String, input: Seq[IngredientInstance], metaData: Map[String, String]): IO[Option[EventInstance]] def interfaces(uri: Uri): IO[Interactions] } @@ -65,13 +65,13 @@ class BaseRemoteInteractionClient( ) ) - def execute(uri: Uri, interactionId: String, input: Seq[IngredientInstance]): IO[Option[EventInstance]] = { + def execute(uri: Uri, interactionId: String, input: Seq[IngredientInstance], metaData: Map[String, String]): IO[Option[EventInstance]] = { client.expect[ExecutionResult]( Request[IO]( method = POST, uri = uri, headers = headers, - ).withEntity(ExecutionRequest(interactionId, input.toList))) + ).withEntity(ExecutionRequest(interactionId, input.toList, Some(metaData)))) .flatMap { case InteractionExecution.ExecutionResult(Right(success)) => IO { diff --git a/bakery/interaction/src/main/scala/com/ing/bakery/interaction/RemoteInteractionService.scala b/bakery/interaction/src/main/scala/com/ing/bakery/interaction/RemoteInteractionService.scala index ad5751e23..126ba6419 100644 --- a/bakery/interaction/src/main/scala/com/ing/bakery/interaction/RemoteInteractionService.scala +++ b/bakery/interaction/src/main/scala/com/ing/bakery/interaction/RemoteInteractionService.scala @@ -1,6 +1,7 @@ package com.ing.bakery.interaction import cats.effect.{ContextShift, IO, Resource, Timer} +import com.ing.baker.runtime.model.BakerLogging import com.ing.baker.runtime.scaladsl.InteractionInstance import com.ing.baker.runtime.serialization.InteractionExecutionJsonCodecs._ import com.ing.baker.runtime.serialization.{InteractionExecution => I} @@ -88,6 +89,8 @@ abstract class InteractionExecutor extends LazyLogging { implicit val contextShift: ContextShift[IO] = IO.contextShift(executionContext) implicit val timer: Timer[IO] = IO.timer(executionContext) + val bakerLogging: BakerLogging = BakerLogging(logger) + protected val CurrentInteractions: I.Interactions = I.Interactions(System.currentTimeMillis, interactions.map(interaction => @@ -100,28 +103,29 @@ abstract class InteractionExecutor extends LazyLogging { message = Option(message).getOrElse("NullPointerException")) )))) - protected def execute(request: I.ExecutionRequest): IO[I.ExecutionResult] = { - logger.debug(s"Trying to execute interaction: ${request.id}") + val metadata = request.metaData.getOrElse(Map()) + bakerLogging.withMDC(metadata, _.debug(s"Trying to execute interaction: ${request.id}")) interactions.find(_.shaBase64 == request.id) match { case Some(interaction) => - logger.info(s"Executing interaction: ${interaction.name}") - IO.fromFuture(IO(interaction.run(request.ingredients))).attempt.flatMap { - case Right(value) => { - logger.info(s"Interaction ${interaction.name} executed correctly") + bakerLogging.withMDC(metadata, _.info(s"Executing interaction: ${interaction.name}")) + IO.fromFuture(IO(interaction.execute(request.ingredients, request.metaData.getOrElse(Map())))).attempt.flatMap { + case Right(value) => + bakerLogging.withMDC(metadata, _.info(s"Interaction ${interaction.name} executed correctly")) IO(I.ExecutionResult(Right(I.Success(value)))) - } case Left(e) => val rootCause = e match { case _: InvocationTargetException if Option(e.getCause).isDefined => e.getCause case _ => e } - logger.error(s"Interaction ${interaction.name} failed with an exception: ${rootCause.getMessage}", rootCause) + bakerLogging.withMDC(metadata, _.error(s"Interaction ${interaction.name} failed with an exception: ${rootCause.getMessage}", rootCause)) executionFailure(interaction.name, rootCause.getMessage) + } case None => - logger.error(s"No implementation found for execution for id: ${request.id}") + bakerLogging.withMDC(metadata, _.error(s"No implementation found for execution for id: ${request.id}")) IO(I.ExecutionResult(Left(I.Failure(I.NoInstanceFound)))) + } } } diff --git a/bakery/interaction/src/test/scala/com/ing/bakery/interaction/RemoteInteractionSpec.scala b/bakery/interaction/src/test/scala/com/ing/bakery/interaction/RemoteInteractionSpec.scala index cfe53c273..4d27da565 100644 --- a/bakery/interaction/src/test/scala/com/ing/bakery/interaction/RemoteInteractionSpec.scala +++ b/bakery/interaction/src/test/scala/com/ing/bakery/interaction/RemoteInteractionSpec.scala @@ -105,7 +105,7 @@ class RemoteInteractionSpec extends BakeryFunSpec { val implementation1 = InteractionInstance( name = "TestInteraction1", - input = Seq(InteractionInstanceInput(Option.empty, CharArray), InteractionInstanceInput(Option.empty, Int64)), + input = Seq(InteractionInstanceInput(Option.empty, CharArray), InteractionInstanceInput(Option.empty, Int64)).toList, run = input => Future.successful(Some(result(input.head.value.as[String] + "!", input(1).value.as[Int] + 1))) ) @@ -125,8 +125,8 @@ class RemoteInteractionSpec extends BakeryFunSpec { val ingredient0 = IngredientInstance("input0", PrimitiveValue("A")) val ingredient1 = IngredientInstance("input1", PrimitiveValue(1)) for { - result0 <- client.execute(uri, implementation0.shaBase64, Seq(ingredient0, ingredient1)) - result1 <- client.execute(uri, implementation1.shaBase64, Seq(ingredient0, ingredient1)) + result0 <- client.execute(uri, implementation0.shaBase64, Seq(ingredient0, ingredient1), Map.empty) + result1 <- client.execute(uri, implementation1.shaBase64, Seq(ingredient0, ingredient1), Map.empty) } yield { assert(result0 === Some(result("A", 1))) assert(result1 === Some(result("A!", 2))) @@ -138,7 +138,7 @@ class RemoteInteractionSpec extends BakeryFunSpec { context.withNoTrustClient(List(implementation0)) { (client, uri) => val ingredient0 = IngredientInstance("input0", PrimitiveValue("A")) val ingredient1 = IngredientInstance("input1", PrimitiveValue(1)) - val result: IO[Option[String]] = client.execute(uri, implementation0.shaBase64, Seq(ingredient0, ingredient1)) + val result: IO[Option[String]] = client.execute(uri, implementation0.shaBase64, Seq(ingredient0, ingredient1), Map.empty) .map(_ => None) .handleErrorWith { case _: java.net.ConnectException | Command.EOF => IO.pure(Some("connection error")) diff --git a/bakery/state/src/main/scala/com/ing/bakery/components/InteractionRegistry.scala b/bakery/state/src/main/scala/com/ing/bakery/components/InteractionRegistry.scala index 1de324662..e8faa98a4 100644 --- a/bakery/state/src/main/scala/com/ing/bakery/components/InteractionRegistry.scala +++ b/bakery/state/src/main/scala/com/ing/bakery/components/InteractionRegistry.scala @@ -7,6 +7,8 @@ import cats.syntax.all._ import com.ing.baker.runtime.akka.internal.DynamicInteractionManager import com.ing.baker.runtime.defaultinteractions import com.ing.baker.runtime.model.{InteractionInstance, InteractionManager} +import com.ing.baker.runtime.scaladsl.{EventInstance, IngredientInstance, InteractionInstanceInput} +import com.ing.baker.types.Type import com.ing.bakery.interaction.{BaseRemoteInteractionClient, RemoteInteractionClient} import com.ing.bakery.metrics.MetricService import com.typesafe.config.Config @@ -17,6 +19,7 @@ import org.http4s.{Headers, Uri} import scalax.collection.ChainingOps import java.io.IOException +import scala.collection.immutable.Seq import scala.concurrent.duration.{DurationInt, FiniteDuration} object InteractionRegistry extends LazyLogging { @@ -153,13 +156,15 @@ trait RemoteInteractionDiscovery extends LazyLogging { val interfaces = response.interactions if (interfaces.isEmpty) logger.warn(s"${uri.toString} provides no interactions") RemoteInteractions(response.startedAt, - interfaces.map(interaction => { - InteractionInstance.build[IO]( - _name = interaction.name, - _input = interaction.input, - _output = interaction.output, - _run = input => remoteInteractions.execute(uri, interaction.id, input), - ) + interfaces.map(interface => { + new InteractionInstance[IO] { + override val name: String = interface.name + override val input: Seq[InteractionInstanceInput] = interface.input + override val run: Seq[IngredientInstance] => IO[Option[EventInstance]] = _ => IO.pure(Option.empty) //Ignoring the run since the execute does not use it + override val output: Option[Map[String, Map[String, Type]]] = interface.output + override def execute(input: Seq[IngredientInstance], metadata: Map[String, String]): IO[Option[EventInstance]] = + remoteInteractions.execute(uri, interface.id, input, metadata) + } })) } } diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/AkkaBaker.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/AkkaBaker.scala index 59c96bcea..d9e96eeca 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/AkkaBaker.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/AkkaBaker.scala @@ -181,7 +181,7 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker case None => cats.effect.IO.pure(InteractionExecutionResult(Left(InteractionExecutionResult.Failure( InteractionExecutionFailureReason.INTERACTION_NOT_FOUND, None, None)))) case Some(interactionInstance) => - interactionInstance.execute(ingredients) + interactionInstance.execute(ingredients, Map()) .map(executionSuccess => InteractionExecutionResult(Right(InteractionExecutionResult.Success(executionSuccess)))) .recover { case e => InteractionExecutionResult(Left(InteractionExecutionResult.Failure( diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/common/InteractionInstance.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/common/InteractionInstance.scala index 96f1b2f98..2e2ee8cfc 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/common/InteractionInstance.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/common/InteractionInstance.scala @@ -38,5 +38,5 @@ trait InteractionInstance[F[_]] extends LanguageApi { self => * Note: The input is a sequence of ingredients because there can be 2 ingredients with the same name, e.g. when * 2 ingredients get renamed to the same name. */ - def execute(input: language.Seq[Ingredient]): F[language.Option[Event]] + def execute(input: language.Seq[Ingredient], metaData: Map[String, String]): F[language.Option[Event]] } diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/javadsl/InteractionInstance.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/javadsl/InteractionInstance.scala index 6de227267..c85a2966b 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/javadsl/InteractionInstance.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/javadsl/InteractionInstance.scala @@ -31,11 +31,13 @@ abstract class InteractionInstance extends common.InteractionInstance[Completabl override val output: Optional[util.Map[String, util.Map[String, Type]]] = Optional.empty() - override def execute(input: util.List[IngredientInstance]): CompletableFuture[Optional[EventInstance]] + def run(input: util.List[IngredientInstance]): CompletableFuture[Optional[EventInstance]] + + override def execute(input: util.List[IngredientInstance], metadata: Map[String, String]): CompletableFuture[Optional[EventInstance]] @nowarn - private def wrapExecuteToFuture(input: Seq[scaladsl.IngredientInstance]): Future[Option[scaladsl.EventInstance]] = { - FutureConverters.toScala(execute(input.map(_.asJava).asJava) + private def wrapRunToFuture(input: Seq[scaladsl.IngredientInstance]): Future[Option[scaladsl.EventInstance]] = { + FutureConverters.toScala(run(input.map(_.asJava).asJava) .thenApply[Option[scaladsl.EventInstance]] { optional => if (optional.isPresent) Some(optional.get().asScala) @@ -53,7 +55,7 @@ abstract class InteractionInstance extends common.InteractionInstance[Completabl scaladsl.InteractionInstance( name, input.asScala.map(input => input.asScala).toIndexedSeq, - input => wrapExecuteToFuture(input), + input => wrapRunToFuture(input), outputOrNone ) } @@ -63,7 +65,7 @@ abstract class InteractionInstance extends common.InteractionInstance[Completabl model.InteractionInstance.build( name, input.asScala.map(input => input.asScala).toIndexedSeq, - input => IO.fromFuture(IO(wrapExecuteToFuture(input)))(cs), + input => IO.fromFuture(IO(wrapRunToFuture(input)))(cs), outputOrNone ) } @@ -95,11 +97,15 @@ object InteractionInstance { case Some(out) => Optional.of(out.view.map { case (key, value) => (key, value.asJava)}.toMap.asJava) case None => Optional.empty[util.Map[String, util.Map[String, Type]]]() } - override def execute(input: util.List[javadsl.IngredientInstance]): CompletableFuture[Optional[javadsl.EventInstance]] = + + override def run(input: util.List[javadsl.IngredientInstance]): CompletableFuture[Optional[javadsl.EventInstance]] = converter(common.run(input.asScala.map(_.asScala).toIndexedSeq)) .thenApply( _.fold(Optional.empty[javadsl.EventInstance]())( e => Optional.of(e.asJava))) + + override def execute(input: util.List[javadsl.IngredientInstance], metadata: Map[String, String]): CompletableFuture[Optional[javadsl.EventInstance]] = + run(input) } } } diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/BakerF.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/BakerF.scala index 46a350d73..66b3842c1 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/BakerF.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/BakerF.scala @@ -132,7 +132,7 @@ abstract class BakerF[F[_]](implicit components: BakerComponents[F], effect: Con case None => effect.pure(InteractionExecutionResult(Left(InteractionExecutionResult.Failure( InteractionExecutionFailureReason.INTERACTION_NOT_FOUND, None, None)))) case Some(interactionInstance) => - interactionInstance.execute(ingredients) + interactionInstance.execute(ingredients, Map.empty) .map(executionSuccess => InteractionExecutionResult(Right(InteractionExecutionResult.Success(executionSuccess)))) .recover { case e => InteractionExecutionResult(Left(InteractionExecutionResult.Failure( diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/BakerLogging.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/BakerLogging.scala index e28ba7560..1012621fa 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/BakerLogging.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/BakerLogging.scala @@ -30,7 +30,7 @@ case class BakerLogging(logger: Logger = BakerLogging.defaultLogger) { .appendSeparator(" ") .toFormatter - private def withMDC(mdc: Map[String, String], log: Logger => Unit): Unit = { + def withMDC(mdc: Map[String, String], log: Logger => Unit): Unit = { mdc.foreach { case (k, v) => MDC.put(k, v) } log(logger) mdc.keys.foreach(MDC.remove) diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/InteractionInstance.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/InteractionInstance.scala index 8409cc7a3..21b80a7c5 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/InteractionInstance.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/InteractionInstance.scala @@ -30,7 +30,8 @@ abstract class InteractionInstance[F[_]] extends common.InteractionInstance[F] w override type Input = InteractionInstanceInput - override def execute(input: Seq[IngredientInstance]): F[Option[Event]] = + //By default the metadata is not used but is given so implementation can overwrite it + override def execute(input: Seq[IngredientInstance], metadata: Map[String, String]): F[Option[Event]] = run(input) def shaBase64: String = { diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/InteractionManager.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/InteractionManager.scala index ac6f1f071..b778f59f3 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/InteractionManager.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/InteractionManager.scala @@ -43,7 +43,7 @@ trait InteractionManager[F[_]] { def execute(interaction: InteractionTransition, input: Seq[IngredientInstance], metadata: Option[Map[String, String]])(implicit sync: Sync[F], effect: MonadError[F, Throwable]): F[Option[EventInstance]] = findFor(interaction) .flatMap { - case Some(implementation) => implementation.execute(input) + case Some(implementation) => implementation.execute(input, metadata.getOrElse(Map())) case None => effect.raiseError(new FatalInteractionException(s"No implementation available for interaction ${interaction.interactionName}")) } diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/TransitionExecution.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/TransitionExecution.scala index f3cbc2cc9..b23a02415 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/TransitionExecution.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/TransitionExecution.scala @@ -135,12 +135,12 @@ private[recipeinstance] case class TransitionExecution( } def setupMdc: F[Unit] = effect.delay { - MDC.put("RecipeInstanceId", recipeInstanceId) + MDC.put("recipeInstanceId", recipeInstanceId) MDC.put("recipeName", recipe.name) } def cleanMdc: F[Unit] = effect.delay { - MDC.remove("RecipeInstanceId") + MDC.remove("recipeInstanceId") MDC.remove("recipeName") } diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/serialization/InteractionExecution.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/serialization/InteractionExecution.scala index 795d52655..40da8d0b7 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/serialization/InteractionExecution.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/serialization/InteractionExecution.scala @@ -39,7 +39,7 @@ object InteractionExecution { case class Interactions(startedAt: Long, interactions: List[Descriptor]) case class Descriptor(id: String, name: String, input: Seq[InteractionInstanceInput], output: Option[Map[String, Map[String, Type]]]) - case class ExecutionRequest(id: String, ingredients: List[IngredientInstance]) + case class ExecutionRequest(id: String, ingredients: List[IngredientInstance], metaData: Option[Map[String, String]]) case class ExecutionResult(outcome: Either[Failure, Success]) sealed trait Result diff --git a/http/baker-http-client/src/main/scala/com/ing/baker/http/client/scaladsl/BakerClient.scala b/http/baker-http-client/src/main/scala/com/ing/baker/http/client/scaladsl/BakerClient.scala index 9391a8798..3f5634dac 100644 --- a/http/baker-http-client/src/main/scala/com/ing/baker/http/client/scaladsl/BakerClient.scala +++ b/http/baker-http-client/src/main/scala/com/ing/baker/http/client/scaladsl/BakerClient.scala @@ -196,7 +196,7 @@ final class BakerClient( client: Client[IO], override def executeSingleInteraction(interactionId: String, ingredients: Seq[IngredientInstance]): Future[InteractionExecutionResult] = callRemoteBakerService[InteractionExecution.ExecutionResult]((host, prefix) => - POST(InteractionExecution.ExecutionRequest(interactionId, ingredients.toList), root(host, prefix) / "app" / "interactions" / "execute")).map{ result => + POST(InteractionExecution.ExecutionRequest(interactionId, ingredients.toList, Option.empty), root(host, prefix) / "app" / "interactions" / "execute")).map{ result => result.outcome match { case Left(failure) => InteractionExecutionResult(Left(failure.toBakerInteractionExecutionFailure)) case Right(success) => InteractionExecutionResult(Right(success.toBakerInteractionExecutionSuccess))