Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Standarised the logging between the Baker implementations and added missing MDC information #1447

Merged
merged 2 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ trait RemoteInteractionClient {

def entityCodecs: (EntityEncoder[IO, ExecutionRequest], EntityDecoder[IO, ExecutionResult], EntityDecoder[IO, Interactions])

def execute(uri: Uri, interactionId: String, input: Seq[IngredientInstance]): IO[Option[EventInstance]]
def execute(uri: Uri, interactionId: String, input: Seq[IngredientInstance], metaData: Map[String, String]): IO[Option[EventInstance]]

def interfaces(uri: Uri): IO[Interactions]
}
Expand Down Expand Up @@ -65,13 +65,13 @@ class BaseRemoteInteractionClient(
)
)

def execute(uri: Uri, interactionId: String, input: Seq[IngredientInstance]): IO[Option[EventInstance]] = {
def execute(uri: Uri, interactionId: String, input: Seq[IngredientInstance], metaData: Map[String, String]): IO[Option[EventInstance]] = {
client.expect[ExecutionResult](
Request[IO](
method = POST,
uri = uri,
headers = headers,
).withEntity(ExecutionRequest(interactionId, input.toList)))
).withEntity(ExecutionRequest(interactionId, input.toList, Some(metaData))))
.flatMap {
case InteractionExecution.ExecutionResult(Right(success)) =>
IO {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ing.bakery.interaction

import cats.effect.{ContextShift, IO, Resource, Timer}
import com.ing.baker.runtime.model.BakerLogging
import com.ing.baker.runtime.scaladsl.InteractionInstance
import com.ing.baker.runtime.serialization.InteractionExecutionJsonCodecs._
import com.ing.baker.runtime.serialization.{InteractionExecution => I}
Expand Down Expand Up @@ -88,6 +89,8 @@ abstract class InteractionExecutor extends LazyLogging {
implicit val contextShift: ContextShift[IO] = IO.contextShift(executionContext)
implicit val timer: Timer[IO] = IO.timer(executionContext)

val bakerLogging: BakerLogging = BakerLogging(logger)

protected val CurrentInteractions: I.Interactions =
I.Interactions(System.currentTimeMillis,
interactions.map(interaction =>
Expand All @@ -100,28 +103,29 @@ abstract class InteractionExecutor extends LazyLogging {
message = Option(message).getOrElse("NullPointerException"))
))))


protected def execute(request: I.ExecutionRequest): IO[I.ExecutionResult] = {
logger.debug(s"Trying to execute interaction: ${request.id}")
val metadata = request.metaData.getOrElse(Map())
bakerLogging.withMDC(metadata, _.debug(s"Trying to execute interaction: ${request.id}"))
interactions.find(_.shaBase64 == request.id) match {
case Some(interaction) =>
logger.info(s"Executing interaction: ${interaction.name}")
IO.fromFuture(IO(interaction.run(request.ingredients))).attempt.flatMap {
case Right(value) => {
logger.info(s"Interaction ${interaction.name} executed correctly")
bakerLogging.withMDC(metadata, _.info(s"Executing interaction: ${interaction.name}"))
IO.fromFuture(IO(interaction.execute(request.ingredients, request.metaData.getOrElse(Map())))).attempt.flatMap {
case Right(value) =>
bakerLogging.withMDC(metadata, _.info(s"Interaction ${interaction.name} executed correctly"))
IO(I.ExecutionResult(Right(I.Success(value))))
}
case Left(e) =>
val rootCause = e match {
case _: InvocationTargetException if Option(e.getCause).isDefined => e.getCause
case _ => e
}
logger.error(s"Interaction ${interaction.name} failed with an exception: ${rootCause.getMessage}", rootCause)
bakerLogging.withMDC(metadata, _.error(s"Interaction ${interaction.name} failed with an exception: ${rootCause.getMessage}", rootCause))
executionFailure(interaction.name, rootCause.getMessage)

}
case None =>
logger.error(s"No implementation found for execution for id: ${request.id}")
bakerLogging.withMDC(metadata, _.error(s"No implementation found for execution for id: ${request.id}"))
IO(I.ExecutionResult(Left(I.Failure(I.NoInstanceFound))))

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class RemoteInteractionSpec extends BakeryFunSpec {

val implementation1 = InteractionInstance(
name = "TestInteraction1",
input = Seq(InteractionInstanceInput(Option.empty, CharArray), InteractionInstanceInput(Option.empty, Int64)),
input = Seq(InteractionInstanceInput(Option.empty, CharArray), InteractionInstanceInput(Option.empty, Int64)).toList,
run = input => Future.successful(Some(result(input.head.value.as[String] + "!", input(1).value.as[Int] + 1)))
)

Expand All @@ -125,8 +125,8 @@ class RemoteInteractionSpec extends BakeryFunSpec {
val ingredient0 = IngredientInstance("input0", PrimitiveValue("A"))
val ingredient1 = IngredientInstance("input1", PrimitiveValue(1))
for {
result0 <- client.execute(uri, implementation0.shaBase64, Seq(ingredient0, ingredient1))
result1 <- client.execute(uri, implementation1.shaBase64, Seq(ingredient0, ingredient1))
result0 <- client.execute(uri, implementation0.shaBase64, Seq(ingredient0, ingredient1), Map.empty)
result1 <- client.execute(uri, implementation1.shaBase64, Seq(ingredient0, ingredient1), Map.empty)
} yield {
assert(result0 === Some(result("A", 1)))
assert(result1 === Some(result("A!", 2)))
Expand All @@ -138,7 +138,7 @@ class RemoteInteractionSpec extends BakeryFunSpec {
context.withNoTrustClient(List(implementation0)) { (client, uri) =>
val ingredient0 = IngredientInstance("input0", PrimitiveValue("A"))
val ingredient1 = IngredientInstance("input1", PrimitiveValue(1))
val result: IO[Option[String]] = client.execute(uri, implementation0.shaBase64, Seq(ingredient0, ingredient1))
val result: IO[Option[String]] = client.execute(uri, implementation0.shaBase64, Seq(ingredient0, ingredient1), Map.empty)
.map(_ => None)
.handleErrorWith {
case _: java.net.ConnectException | Command.EOF => IO.pure(Some("connection error"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import cats.syntax.all._
import com.ing.baker.runtime.akka.internal.DynamicInteractionManager
import com.ing.baker.runtime.defaultinteractions
import com.ing.baker.runtime.model.{InteractionInstance, InteractionManager}
import com.ing.baker.runtime.scaladsl.{EventInstance, IngredientInstance, InteractionInstanceInput}
import com.ing.baker.types.Type
import com.ing.bakery.interaction.{BaseRemoteInteractionClient, RemoteInteractionClient}
import com.ing.bakery.metrics.MetricService
import com.typesafe.config.Config
Expand All @@ -17,6 +19,7 @@ import org.http4s.{Headers, Uri}
import scalax.collection.ChainingOps

import java.io.IOException
import scala.collection.immutable.Seq
import scala.concurrent.duration.{DurationInt, FiniteDuration}

object InteractionRegistry extends LazyLogging {
Expand Down Expand Up @@ -153,13 +156,15 @@ trait RemoteInteractionDiscovery extends LazyLogging {
val interfaces = response.interactions
if (interfaces.isEmpty) logger.warn(s"${uri.toString} provides no interactions")
RemoteInteractions(response.startedAt,
interfaces.map(interaction => {
InteractionInstance.build[IO](
_name = interaction.name,
_input = interaction.input,
_output = interaction.output,
_run = input => remoteInteractions.execute(uri, interaction.id, input),
)
interfaces.map(interface => {
new InteractionInstance[IO] {
override val name: String = interface.name
override val input: Seq[InteractionInstanceInput] = interface.input
override val run: Seq[IngredientInstance] => IO[Option[EventInstance]] = _ => IO.pure(Option.empty) //Ignoring the run since the execute does not use it
override val output: Option[Map[String, Map[String, Type]]] = interface.output
override def execute(input: Seq[IngredientInstance], metadata: Map[String, String]): IO[Option[EventInstance]] =
remoteInteractions.execute(uri, interface.id, input, metadata)
}
}))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker
case None => cats.effect.IO.pure(InteractionExecutionResult(Left(InteractionExecutionResult.Failure(
InteractionExecutionFailureReason.INTERACTION_NOT_FOUND, None, None))))
case Some(interactionInstance) =>
interactionInstance.execute(ingredients)
interactionInstance.execute(ingredients, Map())
.map(executionSuccess => InteractionExecutionResult(Right(InteractionExecutionResult.Success(executionSuccess))))
.recover {
case e => InteractionExecutionResult(Left(InteractionExecutionResult.Failure(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.ing.baker.runtime.akka.actor.logging

import akka.event.EventStream
import com.ing.baker.il.petrinet.Transition
import com.ing.baker.runtime.common.{EventReceived, EventRejected, InteractionCompleted, InteractionFailed, InteractionStarted, RecipeAdded, RecipeInstanceCreated}
import com.ing.baker.runtime.model.BakerLogging

object LogAndSendEvent {

val bakerLogging: BakerLogging = BakerLogging.default
//TODO get this on startup instead of requiring for each call
//val eventStream: EventStream = context.system.eventStream

def recipeAdded(recipeAdded: RecipeAdded, eventStream: EventStream): Unit = {
eventStream.publish(recipeAdded)
bakerLogging.addedRecipe(recipeAdded)
}

def recipeInstanceCreated(recipeInstanceCreated: RecipeInstanceCreated, eventStream: EventStream): Unit = {
eventStream.publish(recipeInstanceCreated)
bakerLogging.recipeInstanceCreated(recipeInstanceCreated)
}

def interactionStarted(interactionStarted: InteractionStarted, eventStream: EventStream): Unit = {
eventStream.publish(interactionStarted)
bakerLogging.interactionStarted(interactionStarted)
}

def interactionCompleted(interactionCompleted: InteractionCompleted, eventStream: EventStream): Unit = {
eventStream.publish(interactionCompleted)
bakerLogging.interactionFinished(interactionCompleted)
}

def interactionFailed(interactionFailed: InteractionFailed, eventStream: EventStream): Unit = {
eventStream.publish(interactionFailed)
bakerLogging.interactionFailed(interactionFailed)
}

def eventReceived(eventReceived: EventReceived, eventStream: EventStream): Unit = {
eventStream.publish(eventReceived)
bakerLogging.eventReceived(eventReceived)
}

def eventRejected(eventRejected: EventRejected, eventStream: EventStream): Unit = {
eventStream.publish(eventRejected)
bakerLogging.eventRejected(eventRejected)
}

def firingEvent(recipeInstanceId: String, executionId: Long, transition: Transition, timeStarted: Long): Unit = {
//TODO This does not have a corrosponding BakerEvent, this should be created
bakerLogging.firingEvent(recipeInstanceId, executionId, transition, timeStarted)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import com.ing.baker.il.{CompiledRecipe, EventDescriptor}
import com.ing.baker.petrinet.api._
import com.ing.baker.runtime.akka._
import com.ing.baker.runtime.akka.actor.Util.logging._
import com.ing.baker.runtime.akka.actor.logging.LogAndSendEvent
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndex._
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexProtocol._
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol.ExceptionStrategy.{BlockTransition, Continue, RetryWithDelay}
Expand All @@ -23,7 +24,7 @@ import com.ing.baker.runtime.akka.actor.recipe_manager.RecipeManagerProtocol._
import com.ing.baker.runtime.akka.actor.serialization.BakerSerializable
import com.ing.baker.runtime.akka.internal.RecipeRuntime
import com.ing.baker.runtime.common.RecipeRecord
import com.ing.baker.runtime.model.InteractionManager
import com.ing.baker.runtime.model.{BakerLogging, InteractionManager}
import com.ing.baker.runtime.recipe_manager.RecipeManager
import com.ing.baker.runtime.scaladsl.{EventInstance, RecipeInstanceCreated, RecipeInstanceState}
import com.ing.baker.runtime.serialization.Encryption
Expand Down Expand Up @@ -295,7 +296,9 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration],

val actorMetadata = ActorMetadata(recipeId, recipeInstanceId, createdTime, Active)

context.system.eventStream.publish(RecipeInstanceCreated(System.currentTimeMillis(), recipeId, compiledRecipe.name, recipeInstanceId))
LogAndSendEvent.recipeInstanceCreated(
RecipeInstanceCreated(System.currentTimeMillis(), recipeId, compiledRecipe.name, recipeInstanceId),
context.system.eventStream)

index += recipeInstanceId -> actorMetadata
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.ing.baker.runtime.akka.actor.process_index
import akka.actor.{Actor, ActorLogging, ActorRef, Props, ReceiveTimeout}
import akka.sensors.actor.ActorMetrics
import com.ing.baker.il.CompiledRecipe
import com.ing.baker.runtime.akka.actor.logging.LogAndSendEvent
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexProtocol._
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol._
Expand Down Expand Up @@ -35,14 +36,15 @@ class SensoryEventResponseHandler(receiver: ActorRef, command: ProcessEvent, ing
}

def notifyReceive(recipe: CompiledRecipe): Unit = {
context.system.eventStream.publish(
LogAndSendEvent.eventReceived(
EventReceived(
System.currentTimeMillis(),
recipe.name,
recipe.recipeId,
command.recipeInstanceId,
command.correlationId,
command.event))
command.event), context.system.eventStream)

command.reaction match {
case FireSensoryEventReaction.NotifyWhenCompleted(_) =>
()
Expand Down Expand Up @@ -87,13 +89,14 @@ class SensoryEventResponseHandler(receiver: ActorRef, command: ProcessEvent, ing
log.debug("Stopping SensoryEventResponseHandler and rejecting request")
log.debug("Reject reason: " + rejection.asReason)
log.debug("message: " + rejection)
context.system.eventStream.publish(
LogAndSendEvent.eventRejected(
EventRejected(
System.currentTimeMillis(),
command.recipeInstanceId,
command.correlationId,
command.event,
rejection.asReason))
rejection.asReason), context.system.eventStream)

command.reaction match {
case FireSensoryEventReaction.NotifyBoth(_, completeReceiver) =>
receiver ! rejection; completeReceiver ! rejection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import akka.cluster.sharding.ShardRegion.Passivate
import akka.event.{DiagnosticLoggingAdapter, Logging}
import akka.persistence.{DeleteMessagesFailure, DeleteMessagesSuccess}
import cats.effect.IO
import com.ing.baker.il.petrinet.Transition
import com.ing.baker.il.petrinet.{EventTransition, Transition}
import com.ing.baker.petrinet.api._
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexProtocol.FireSensoryEventRejection
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstance._
Expand All @@ -15,6 +15,7 @@ import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol
import com.ing.baker.runtime.akka.actor.process_instance.internal.ExceptionStrategy.{Continue, RetryWithDelay}
import com.ing.baker.runtime.akka.actor.process_instance.internal._
import com.ing.baker.runtime.akka.actor.process_instance.{ProcessInstanceProtocol => protocol}
import com.ing.baker.runtime.model.BakerLogging
import com.ing.baker.runtime.scaladsl.RecipeInstanceState
import com.ing.baker.runtime.serialization.Encryption
import com.ing.baker.types.PrimitiveValue
Expand Down Expand Up @@ -426,7 +427,11 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](

def executeJob(job: Job[P, T, S], originalSender: ActorRef): Unit = {

log.firingInteraction(recipeInstanceId, job.id, job.transition.asInstanceOf[Transition], System.currentTimeMillis())
log.fireTransition(recipeInstanceId, job.id, job.transition.asInstanceOf[Transition], System.currentTimeMillis())

if(job.transition.isInstanceOf[EventTransition]) {
BakerLogging.default.firingEvent(recipeInstanceId, job.id, job.transition.asInstanceOf[Transition], System.currentTimeMillis())
}

// context.self can be potentially throw NullPointerException in non graceful shutdown situations
Try(context.self).foreach { self =>
Expand Down
Loading