Skip to content

Commit

Permalink
Standarised the logging between the Baker implementations and added m…
Browse files Browse the repository at this point in the history
…issing MDC information (#1447)

* Made the AkkaBaker use the BakerLogging for logging Baker releated events. Added missing logs & corrected logs that where wrong.

* Added the MetaData object also to the execute of the InteractionInstance.
This can then be used inside of the implementation as a context for logging or similar use cases.
  • Loading branch information
Tim-Linschoten authored Mar 7, 2023
1 parent b3ccb54 commit 79b51c5
Show file tree
Hide file tree
Showing 26 changed files with 259 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ trait RemoteInteractionClient {

def entityCodecs: (EntityEncoder[IO, ExecutionRequest], EntityDecoder[IO, ExecutionResult], EntityDecoder[IO, Interactions])

def execute(uri: Uri, interactionId: String, input: Seq[IngredientInstance]): IO[Option[EventInstance]]
def execute(uri: Uri, interactionId: String, input: Seq[IngredientInstance], metaData: Map[String, String]): IO[Option[EventInstance]]

def interfaces(uri: Uri): IO[Interactions]
}
Expand Down Expand Up @@ -65,13 +65,13 @@ class BaseRemoteInteractionClient(
)
)

def execute(uri: Uri, interactionId: String, input: Seq[IngredientInstance]): IO[Option[EventInstance]] = {
def execute(uri: Uri, interactionId: String, input: Seq[IngredientInstance], metaData: Map[String, String]): IO[Option[EventInstance]] = {
client.expect[ExecutionResult](
Request[IO](
method = POST,
uri = uri,
headers = headers,
).withEntity(ExecutionRequest(interactionId, input.toList)))
).withEntity(ExecutionRequest(interactionId, input.toList, Some(metaData))))
.flatMap {
case InteractionExecution.ExecutionResult(Right(success)) =>
IO {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ing.bakery.interaction

import cats.effect.{ContextShift, IO, Resource, Timer}
import com.ing.baker.runtime.model.BakerLogging
import com.ing.baker.runtime.scaladsl.InteractionInstance
import com.ing.baker.runtime.serialization.InteractionExecutionJsonCodecs._
import com.ing.baker.runtime.serialization.{InteractionExecution => I}
Expand Down Expand Up @@ -88,6 +89,8 @@ abstract class InteractionExecutor extends LazyLogging {
implicit val contextShift: ContextShift[IO] = IO.contextShift(executionContext)
implicit val timer: Timer[IO] = IO.timer(executionContext)

val bakerLogging: BakerLogging = BakerLogging(logger)

protected val CurrentInteractions: I.Interactions =
I.Interactions(System.currentTimeMillis,
interactions.map(interaction =>
Expand All @@ -100,28 +103,29 @@ abstract class InteractionExecutor extends LazyLogging {
message = Option(message).getOrElse("NullPointerException"))
))))


protected def execute(request: I.ExecutionRequest): IO[I.ExecutionResult] = {
logger.debug(s"Trying to execute interaction: ${request.id}")
val metadata = request.metaData.getOrElse(Map())
bakerLogging.withMDC(metadata, _.debug(s"Trying to execute interaction: ${request.id}"))
interactions.find(_.shaBase64 == request.id) match {
case Some(interaction) =>
logger.info(s"Executing interaction: ${interaction.name}")
IO.fromFuture(IO(interaction.run(request.ingredients))).attempt.flatMap {
case Right(value) => {
logger.info(s"Interaction ${interaction.name} executed correctly")
bakerLogging.withMDC(metadata, _.info(s"Executing interaction: ${interaction.name}"))
IO.fromFuture(IO(interaction.execute(request.ingredients, request.metaData.getOrElse(Map())))).attempt.flatMap {
case Right(value) =>
bakerLogging.withMDC(metadata, _.info(s"Interaction ${interaction.name} executed correctly"))
IO(I.ExecutionResult(Right(I.Success(value))))
}
case Left(e) =>
val rootCause = e match {
case _: InvocationTargetException if Option(e.getCause).isDefined => e.getCause
case _ => e
}
logger.error(s"Interaction ${interaction.name} failed with an exception: ${rootCause.getMessage}", rootCause)
bakerLogging.withMDC(metadata, _.error(s"Interaction ${interaction.name} failed with an exception: ${rootCause.getMessage}", rootCause))
executionFailure(interaction.name, rootCause.getMessage)

}
case None =>
logger.error(s"No implementation found for execution for id: ${request.id}")
bakerLogging.withMDC(metadata, _.error(s"No implementation found for execution for id: ${request.id}"))
IO(I.ExecutionResult(Left(I.Failure(I.NoInstanceFound))))

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class RemoteInteractionSpec extends BakeryFunSpec {

val implementation1 = InteractionInstance(
name = "TestInteraction1",
input = Seq(InteractionInstanceInput(Option.empty, CharArray), InteractionInstanceInput(Option.empty, Int64)),
input = Seq(InteractionInstanceInput(Option.empty, CharArray), InteractionInstanceInput(Option.empty, Int64)).toList,
run = input => Future.successful(Some(result(input.head.value.as[String] + "!", input(1).value.as[Int] + 1)))
)

Expand All @@ -125,8 +125,8 @@ class RemoteInteractionSpec extends BakeryFunSpec {
val ingredient0 = IngredientInstance("input0", PrimitiveValue("A"))
val ingredient1 = IngredientInstance("input1", PrimitiveValue(1))
for {
result0 <- client.execute(uri, implementation0.shaBase64, Seq(ingredient0, ingredient1))
result1 <- client.execute(uri, implementation1.shaBase64, Seq(ingredient0, ingredient1))
result0 <- client.execute(uri, implementation0.shaBase64, Seq(ingredient0, ingredient1), Map.empty)
result1 <- client.execute(uri, implementation1.shaBase64, Seq(ingredient0, ingredient1), Map.empty)
} yield {
assert(result0 === Some(result("A", 1)))
assert(result1 === Some(result("A!", 2)))
Expand All @@ -138,7 +138,7 @@ class RemoteInteractionSpec extends BakeryFunSpec {
context.withNoTrustClient(List(implementation0)) { (client, uri) =>
val ingredient0 = IngredientInstance("input0", PrimitiveValue("A"))
val ingredient1 = IngredientInstance("input1", PrimitiveValue(1))
val result: IO[Option[String]] = client.execute(uri, implementation0.shaBase64, Seq(ingredient0, ingredient1))
val result: IO[Option[String]] = client.execute(uri, implementation0.shaBase64, Seq(ingredient0, ingredient1), Map.empty)
.map(_ => None)
.handleErrorWith {
case _: java.net.ConnectException | Command.EOF => IO.pure(Some("connection error"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import cats.syntax.all._
import com.ing.baker.runtime.akka.internal.DynamicInteractionManager
import com.ing.baker.runtime.defaultinteractions
import com.ing.baker.runtime.model.{InteractionInstance, InteractionManager}
import com.ing.baker.runtime.scaladsl.{EventInstance, IngredientInstance, InteractionInstanceInput}
import com.ing.baker.types.Type
import com.ing.bakery.interaction.{BaseRemoteInteractionClient, RemoteInteractionClient}
import com.ing.bakery.metrics.MetricService
import com.typesafe.config.Config
Expand All @@ -17,6 +19,7 @@ import org.http4s.{Headers, Uri}
import scalax.collection.ChainingOps

import java.io.IOException
import scala.collection.immutable.Seq
import scala.concurrent.duration.{DurationInt, FiniteDuration}

object InteractionRegistry extends LazyLogging {
Expand Down Expand Up @@ -153,13 +156,15 @@ trait RemoteInteractionDiscovery extends LazyLogging {
val interfaces = response.interactions
if (interfaces.isEmpty) logger.warn(s"${uri.toString} provides no interactions")
RemoteInteractions(response.startedAt,
interfaces.map(interaction => {
InteractionInstance.build[IO](
_name = interaction.name,
_input = interaction.input,
_output = interaction.output,
_run = input => remoteInteractions.execute(uri, interaction.id, input),
)
interfaces.map(interface => {
new InteractionInstance[IO] {
override val name: String = interface.name
override val input: Seq[InteractionInstanceInput] = interface.input
override val run: Seq[IngredientInstance] => IO[Option[EventInstance]] = _ => IO.pure(Option.empty) //Ignoring the run since the execute does not use it
override val output: Option[Map[String, Map[String, Type]]] = interface.output
override def execute(input: Seq[IngredientInstance], metadata: Map[String, String]): IO[Option[EventInstance]] =
remoteInteractions.execute(uri, interface.id, input, metadata)
}
}))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker
case None => cats.effect.IO.pure(InteractionExecutionResult(Left(InteractionExecutionResult.Failure(
InteractionExecutionFailureReason.INTERACTION_NOT_FOUND, None, None))))
case Some(interactionInstance) =>
interactionInstance.execute(ingredients)
interactionInstance.execute(ingredients, Map())
.map(executionSuccess => InteractionExecutionResult(Right(InteractionExecutionResult.Success(executionSuccess))))
.recover {
case e => InteractionExecutionResult(Left(InteractionExecutionResult.Failure(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.ing.baker.runtime.akka.actor.logging

import akka.event.EventStream
import com.ing.baker.il.petrinet.Transition
import com.ing.baker.runtime.common.{EventReceived, EventRejected, InteractionCompleted, InteractionFailed, InteractionStarted, RecipeAdded, RecipeInstanceCreated}
import com.ing.baker.runtime.model.BakerLogging

object LogAndSendEvent {

val bakerLogging: BakerLogging = BakerLogging.default
//TODO get this on startup instead of requiring for each call
//val eventStream: EventStream = context.system.eventStream

def recipeAdded(recipeAdded: RecipeAdded, eventStream: EventStream): Unit = {
eventStream.publish(recipeAdded)
bakerLogging.addedRecipe(recipeAdded)
}

def recipeInstanceCreated(recipeInstanceCreated: RecipeInstanceCreated, eventStream: EventStream): Unit = {
eventStream.publish(recipeInstanceCreated)
bakerLogging.recipeInstanceCreated(recipeInstanceCreated)
}

def interactionStarted(interactionStarted: InteractionStarted, eventStream: EventStream): Unit = {
eventStream.publish(interactionStarted)
bakerLogging.interactionStarted(interactionStarted)
}

def interactionCompleted(interactionCompleted: InteractionCompleted, eventStream: EventStream): Unit = {
eventStream.publish(interactionCompleted)
bakerLogging.interactionFinished(interactionCompleted)
}

def interactionFailed(interactionFailed: InteractionFailed, eventStream: EventStream): Unit = {
eventStream.publish(interactionFailed)
bakerLogging.interactionFailed(interactionFailed)
}

def eventReceived(eventReceived: EventReceived, eventStream: EventStream): Unit = {
eventStream.publish(eventReceived)
bakerLogging.eventReceived(eventReceived)
}

def eventRejected(eventRejected: EventRejected, eventStream: EventStream): Unit = {
eventStream.publish(eventRejected)
bakerLogging.eventRejected(eventRejected)
}

def firingEvent(recipeInstanceId: String, executionId: Long, transition: Transition, timeStarted: Long): Unit = {
//TODO This does not have a corrosponding BakerEvent, this should be created
bakerLogging.firingEvent(recipeInstanceId, executionId, transition, timeStarted)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import com.ing.baker.il.{CompiledRecipe, EventDescriptor}
import com.ing.baker.petrinet.api._
import com.ing.baker.runtime.akka._
import com.ing.baker.runtime.akka.actor.Util.logging._
import com.ing.baker.runtime.akka.actor.logging.LogAndSendEvent
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndex._
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexProtocol._
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol.ExceptionStrategy.{BlockTransition, Continue, RetryWithDelay}
Expand All @@ -23,7 +24,7 @@ import com.ing.baker.runtime.akka.actor.recipe_manager.RecipeManagerProtocol._
import com.ing.baker.runtime.akka.actor.serialization.BakerSerializable
import com.ing.baker.runtime.akka.internal.RecipeRuntime
import com.ing.baker.runtime.common.RecipeRecord
import com.ing.baker.runtime.model.InteractionManager
import com.ing.baker.runtime.model.{BakerLogging, InteractionManager}
import com.ing.baker.runtime.recipe_manager.RecipeManager
import com.ing.baker.runtime.scaladsl.{EventInstance, RecipeInstanceCreated, RecipeInstanceState}
import com.ing.baker.runtime.serialization.Encryption
Expand Down Expand Up @@ -295,7 +296,9 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration],

val actorMetadata = ActorMetadata(recipeId, recipeInstanceId, createdTime, Active)

context.system.eventStream.publish(RecipeInstanceCreated(System.currentTimeMillis(), recipeId, compiledRecipe.name, recipeInstanceId))
LogAndSendEvent.recipeInstanceCreated(
RecipeInstanceCreated(System.currentTimeMillis(), recipeId, compiledRecipe.name, recipeInstanceId),
context.system.eventStream)

index += recipeInstanceId -> actorMetadata
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.ing.baker.runtime.akka.actor.process_index
import akka.actor.{Actor, ActorLogging, ActorRef, Props, ReceiveTimeout}
import akka.sensors.actor.ActorMetrics
import com.ing.baker.il.CompiledRecipe
import com.ing.baker.runtime.akka.actor.logging.LogAndSendEvent
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexProtocol._
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol._
Expand Down Expand Up @@ -35,14 +36,15 @@ class SensoryEventResponseHandler(receiver: ActorRef, command: ProcessEvent, ing
}

def notifyReceive(recipe: CompiledRecipe): Unit = {
context.system.eventStream.publish(
LogAndSendEvent.eventReceived(
EventReceived(
System.currentTimeMillis(),
recipe.name,
recipe.recipeId,
command.recipeInstanceId,
command.correlationId,
command.event))
command.event), context.system.eventStream)

command.reaction match {
case FireSensoryEventReaction.NotifyWhenCompleted(_) =>
()
Expand Down Expand Up @@ -87,13 +89,14 @@ class SensoryEventResponseHandler(receiver: ActorRef, command: ProcessEvent, ing
log.debug("Stopping SensoryEventResponseHandler and rejecting request")
log.debug("Reject reason: " + rejection.asReason)
log.debug("message: " + rejection)
context.system.eventStream.publish(
LogAndSendEvent.eventRejected(
EventRejected(
System.currentTimeMillis(),
command.recipeInstanceId,
command.correlationId,
command.event,
rejection.asReason))
rejection.asReason), context.system.eventStream)

command.reaction match {
case FireSensoryEventReaction.NotifyBoth(_, completeReceiver) =>
receiver ! rejection; completeReceiver ! rejection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import akka.cluster.sharding.ShardRegion.Passivate
import akka.event.{DiagnosticLoggingAdapter, Logging}
import akka.persistence.{DeleteMessagesFailure, DeleteMessagesSuccess}
import cats.effect.IO
import com.ing.baker.il.petrinet.Transition
import com.ing.baker.il.petrinet.{EventTransition, Transition}
import com.ing.baker.petrinet.api._
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexProtocol.FireSensoryEventRejection
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstance._
Expand All @@ -15,6 +15,7 @@ import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol
import com.ing.baker.runtime.akka.actor.process_instance.internal.ExceptionStrategy.{Continue, RetryWithDelay}
import com.ing.baker.runtime.akka.actor.process_instance.internal._
import com.ing.baker.runtime.akka.actor.process_instance.{ProcessInstanceProtocol => protocol}
import com.ing.baker.runtime.model.BakerLogging
import com.ing.baker.runtime.scaladsl.RecipeInstanceState
import com.ing.baker.runtime.serialization.Encryption
import com.ing.baker.types.PrimitiveValue
Expand Down Expand Up @@ -426,7 +427,11 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](

def executeJob(job: Job[P, T, S], originalSender: ActorRef): Unit = {

log.firingInteraction(recipeInstanceId, job.id, job.transition.asInstanceOf[Transition], System.currentTimeMillis())
log.fireTransition(recipeInstanceId, job.id, job.transition.asInstanceOf[Transition], System.currentTimeMillis())

if(job.transition.isInstanceOf[EventTransition]) {
BakerLogging.default.firingEvent(recipeInstanceId, job.id, job.transition.asInstanceOf[Transition], System.currentTimeMillis())
}

// context.self can be potentially throw NullPointerException in non graceful shutdown situations
Try(context.self).foreach { self =>
Expand Down
Loading

0 comments on commit 79b51c5

Please sign in to comment.