Skip to content

Commit

Permalink
Improved on the Ingredient filter. Now also filters on the event stre…
Browse files Browse the repository at this point in the history
…am itself. Made a split between the filter used for the event stream and the GetIngredients.
  • Loading branch information
Tim-Linschoten committed May 2, 2023
1 parent 56da7cb commit d8197e8
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 44 deletions.
6 changes: 6 additions & 0 deletions core/akka-runtime/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ baker {
# The ingredients will be in the ingredients map but there value will be an empty String.
filtered-ingredient-values = []

# Values to filter specifically for the GetIngredients call, this is combined with the filtered-ingredient-values
filtered-ingredient-values-for-get = []

# Values to filter specifically used for the internal event stream, this is combined with the filtered-ingredient-values
filtered-ingredient-values-for-stream = []

# Determines if a recipe can be added if not all needed interaction instances are available.
allow-adding-recipe-without-requiring-instances = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ object AkkaBakerConfig extends LazyLogging {
val localProvider =
new LocalBakerActorProvider(
retentionCheckInterval = 1.minute,
ingredientsFilter = List.empty,
getIngredientsFilter = List.empty,
providedIngredientFilter = List.empty,
actorIdleTimeout = Some(5.minutes),
configuredEncryption = Encryption.NoEncryption,
timeouts = defaultTimeouts
Expand All @@ -102,7 +103,8 @@ object AkkaBakerConfig extends LazyLogging {
nrOfShards = 50,
retentionCheckInterval = 1.minute,
actorIdleTimeout = Some(5.minutes),
ingredientsFilter = List.empty,
getIngredientsFilter = List.empty,
providedIngredientFilter = List.empty,
journalInitializeTimeout = 30.seconds,
seedNodes = ClusterBakerActorProvider.SeedNodesList(seedNodes),
configuredEncryption = Encryption.NoEncryption,
Expand Down Expand Up @@ -141,7 +143,8 @@ object AkkaBakerConfig extends LazyLogging {
case None | Some("local") =>
new LocalBakerActorProvider(
retentionCheckInterval = config.as[FiniteDuration]("baker.actor.retention-check-interval"),
ingredientsFilter = config.as[List[String]]("baker.filtered-ingredient-values"),
getIngredientsFilter = config.as[List[String]]("baker.filtered-ingredient-values") ++ config.as[List[String]]("baker.filtered-ingredient-values-for-get"),
providedIngredientFilter = config.as[List[String]]("baker.filtered-ingredient-values") ++ config.as[List[String]]("baker.filtered-ingredient-values-for-stream"),
actorIdleTimeout = config.as[Option[FiniteDuration]]("baker.actor.idle-timeout"),
configuredEncryption = encryption,
Timeouts.apply(config)
Expand All @@ -159,7 +162,8 @@ object AkkaBakerConfig extends LazyLogging {
else
ClusterBakerActorProvider.ServiceDiscovery
},
ingredientsFilter = config.as[List[String]]("baker.filtered-ingredient-values"),
getIngredientsFilter = config.as[List[String]]("baker.filtered-ingredient-values") ++ config.as[List[String]]("baker.filtered-ingredient-values-for-get"),
providedIngredientFilter = config.as[List[String]]("baker.filtered-ingredient-values") ++ config.as[List[String]]("baker.filtered-ingredient-values-for-stream"),
configuredEncryption = encryption,
Timeouts.apply(config)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class ClusterBakerActorProvider(
actorIdleTimeout: Option[FiniteDuration],
journalInitializeTimeout: FiniteDuration,
seedNodes: ClusterBootstrapMode,
ingredientsFilter: List[String],
getIngredientsFilter: List[String],
providedIngredientFilter: List[String],
configuredEncryption: Encryption,
timeouts: AkkaBakerConfig.Timeouts,
) extends BakerActorProvider with LazyLogging {
Expand Down Expand Up @@ -117,7 +118,8 @@ class ClusterBakerActorProvider(
configuredEncryption,
interactionManager,
recipeManager,
ingredientsFilter),
getIngredientsFilter,
providedIngredientFilter),
settings = clusterShardingSettings,
extractEntityId = ClusterBakerActorProvider.entityIdExtractor(nrOfShards),
extractShardId = ClusterBakerActorProvider.shardIdExtractor(nrOfShards),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import scala.concurrent.duration._

class LocalBakerActorProvider(
retentionCheckInterval: FiniteDuration,
ingredientsFilter: List[String],
getIngredientsFilter: List[String],
providedIngredientFilter: List[String],
actorIdleTimeout: Option[FiniteDuration],
configuredEncryption: Encryption,
timeouts: AkkaBakerConfig.Timeouts,
Expand All @@ -41,7 +42,8 @@ class LocalBakerActorProvider(
configuredEncryption,
interactionManager,
recipeManager,
ingredientsFilter),
getIngredientsFilter,
providedIngredientFilter),
childName = "ProcessIndexActor",
minBackoff = restartMinBackoff,
maxBackoff = restartMaxBackoff,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,16 @@ object ProcessIndex {
configuredEncryption: Encryption,
interactions: InteractionManager[IO],
recipeManager: RecipeManager,
ingredientsFilter: Seq[String]): Props =
getIngredientsFilter: Seq[String],
providedIngredientFilter: Seq[String]): Props =
Props(new ProcessIndex(
recipeInstanceIdleTimeout,
retentionCheckInterval,
configuredEncryption,
interactions,
recipeManager,
ingredientsFilter))
getIngredientsFilter,
providedIngredientFilter))

sealed trait ProcessStatus

Expand Down Expand Up @@ -107,7 +109,8 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration],
configuredEncryption: Encryption,
interactionManager: InteractionManager[IO],
recipeManager: RecipeManager,
ingredientsFilter: Seq[String]) extends PersistentActor with PersistentActorMetrics {
getIngredientsFilter: Seq[String],
providedIngredientFilter: Seq[String]) extends PersistentActor with PersistentActorMetrics {

override val log: DiagnosticLoggingAdapter = Logging.getLogger(logSource = this)

Expand Down Expand Up @@ -193,7 +196,9 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration],
executionContext = bakerExecutionContext,
encryption = configuredEncryption,
idleTTL = recipeInstanceIdleTimeout,
ingredientsFilter = ingredientsFilter),
getIngredientsFilter = getIngredientsFilter,
providedIngredientFilter = providedIngredientFilter,
),
delayedTransitionActor = delayedTransitionActor
),
childName = recipeInstanceId,
Expand Down Expand Up @@ -453,7 +458,7 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration],

def run(program: ActorRef => FireEventIO[Unit], command: ProcessEvent): Unit = {
val responseHandler = context.actorOf(
SensoryEventResponseHandler(sender(), command, ingredientsFilter))
SensoryEventResponseHandler(sender(), command))
program(responseHandler).value.unsafeRunAsync {
case Left(exception) =>
throw exception // TODO decide what to do, might never happen, except if we generalize it as a runtime for the actor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import com.ing.baker.types.{PrimitiveValue, Value}

object SensoryEventResponseHandler {

def apply(receiver: ActorRef, command: ProcessEvent, ingredientsFilter: Seq[String] = Seq.empty): Props =
Props(new SensoryEventResponseHandler(receiver, command, ingredientsFilter))
def apply(receiver: ActorRef, command: ProcessEvent): Props =
Props(new SensoryEventResponseHandler(receiver, command))
}

/**
Expand All @@ -23,7 +23,7 @@ object SensoryEventResponseHandler {
* - Publishes events to the system event stream
* - Does involving logging
*/
class SensoryEventResponseHandler(receiver: ActorRef, command: ProcessEvent, ingredientsFilter: Seq[String])
class SensoryEventResponseHandler(receiver: ActorRef, command: ProcessEvent)
extends Actor with ActorMetrics with ActorLogging {

context.setReceiveTimeout(command.timeout)
Expand Down Expand Up @@ -68,7 +68,7 @@ class SensoryEventResponseHandler(receiver: ActorRef, command: ProcessEvent, ing
def result = SensoryEventResult(
sensoryEventStatus = SensoryEventStatus.Completed,
eventNames = runtimeEvents.map(_.name),
ingredients = filterIngredientValues(runtimeEvents.flatMap(_.providedIngredients).toMap)
ingredients = runtimeEvents.flatMap(_.providedIngredients).toMap
)

command.reaction match {
Expand All @@ -80,13 +80,6 @@ class SensoryEventResponseHandler(receiver: ActorRef, command: ProcessEvent, ing
stopActor()
}

private def filterIngredientValues(ingredients: Map[String, Value]): Map[String, Value] =
ingredients.map(ingredient =>
if (ingredientsFilter.contains(ingredient._1))
ingredient._1 -> PrimitiveValue("")
else
ingredient)

def rejectWith(rejection: FireSensoryEventRejection): Unit = {
log.debug("Stopping SensoryEventResponseHandler and rejecting request")
log.debug("Reject reason: " + rejection.asReason)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ object ProcessInstance {
case class Settings(executionContext: ExecutionContext,
idleTTL: Option[FiniteDuration],
encryption: Encryption,
ingredientsFilter: Seq[String])
getIngredientsFilter: Seq[String],
providedIngredientFilter: Seq[String])

case class IdleStop(seq: Long) extends NoSerializationVerificationNeeded

Expand All @@ -61,6 +62,17 @@ object ProcessInstance {
runtime,
delayedTransitionActor)
)

private def filterIngredients(ingredients: Map[String, Value], ingredientFilter: Seq[String]) = {
val filterAll: Boolean = ingredientFilter.contains("*")
ingredients.map {
ingredient =>
if (filterAll || ingredientFilter.contains(ingredient._1))
ingredient._1 -> PrimitiveValue("")
else
ingredient
}
}
}

/**
Expand All @@ -80,8 +92,6 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](

override val log: DiagnosticLoggingAdapter = Logging.getLogger(this)

val filterAll: Boolean = settings.ingredientsFilter.contains("*")

override def preStart(): Unit = {
log.debug("ProcessInstance started")
}
Expand All @@ -106,7 +116,7 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](
instance.marking.marshall,
instance.state match {
case state: RecipeInstanceState =>
filterIngredientValues(state, settings.ingredientsFilter)
filterIngredientValuesFromState(state)
case _ => instance.state
},
instance.jobs.view.map { case (key, value) => (key, mapJobsToProtocol(value))}.toMap
Expand Down Expand Up @@ -178,13 +188,21 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](
context.stop(context.self)
}

private def filterIngredientValuesFromState(state: RecipeInstanceState): RecipeInstanceState = {
state.copy(ingredients = filterIngredients(state.ingredients, settings.getIngredientsFilter))
}

def filterIngredientValues(state: RecipeInstanceState, ingredientFilter: Seq[String]): RecipeInstanceState = {
state.copy(ingredients = state.ingredients.map(ingredient =>
if (filterAll || ingredientFilter.contains(ingredient._1))
ingredient._1 -> PrimitiveValue("")
else
ingredient))
private def filterIngredientValuesFromEventInstance(eventInstance: Any): Any = {
if(eventInstance == null) {
eventInstance
} else eventInstance match {
case casted: EventInstance =>
if (casted.providedIngredients != null && casted.providedIngredients.nonEmpty)
casted.copy(providedIngredients = filterIngredients(casted.providedIngredients, settings.providedIngredientFilter))
else
casted
case _ => eventInstance
}
}

def running(instance: Instance[P, T, S],
Expand Down Expand Up @@ -213,13 +231,7 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](
}

case GetState =>
val instanceState: InstanceState = mapStateToProtocol(instance)
instanceState.state match {
case state: RecipeInstanceState =>
sender() ! instanceState.copy(state = filterIngredientValues(state, settings.ingredientsFilter))
case _ =>
sender() ! instanceState
}
sender() ! mapStateToProtocol(instance)

case GetIngredient(name) =>
instance.state match {
Expand All @@ -241,6 +253,7 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](
context become running(instance, scheduledRetries)})

case event@TransitionFiredEvent(jobId, transitionId, correlationId, timeStarted, timeCompleted, consumed, produced, output) =>

val transition = instance.petriNet.transitions.getById(transitionId)
log.transitionFired(recipeInstanceId, transition.asInstanceOf[Transition], jobId, timeStarted, timeCompleted)
// persist the success event
Expand All @@ -250,7 +263,7 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](
.andThen {
case (updatedInstance, newJobs) =>
// the sender is notified of the transition having fired
sender() ! TransitionFired(jobId, transitionId, correlationId, consumed, produced, newJobs.map(_.id), output)
sender() ! TransitionFired(jobId, transitionId, correlationId, consumed, produced, newJobs.map(_.id), filterIngredientValuesFromEventInstance(output))

// the job is removed from the state since it completed
context become running(updatedInstance, scheduledRetries - jobId)
Expand Down Expand Up @@ -289,7 +302,7 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](
.andThen {
case (updatedInstance, newJobs) =>
// the sender is notified of the transition having fired
sender() ! TransitionFired(jobId, transitionId, None, null, produced, newJobs.map(_.id), out)
sender() ! TransitionFired(jobId, transitionId, None, null, produced, newJobs.map(_.id), filterIngredientValuesFromEventInstance(out))
// the job is removed from the state since it completed
context become running(updatedInstance, scheduledRetries - jobId)
}
Expand Down Expand Up @@ -340,7 +353,7 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](
eventSource.apply(instance)
.andThen(step)
.andThen { case (updatedInstance, newJobs) =>
sender() ! TransitionFired(jobId, transitionId, correlationId, consume, marshallMarking(produced), newJobs.map(_.id), out)
sender() ! TransitionFired(jobId, transitionId, correlationId, consume, marshallMarking(produced), newJobs.map(_.id), filterIngredientValuesFromEventInstance(out))
context become running(updatedInstance, scheduledRetries - jobId)
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ class ProcessIndexSpec extends TestKit(ActorSystem("ProcessIndexSpec", ProcessIn
configuredEncryption = Encryption.NoEncryption,
interactionManager = CachingInteractionManager(),
recipeManager = recipeManager,
Seq.empty,
Seq.empty) {
override def createProcessActor(id: String, compiledRecipe: CompiledRecipe) = {
context.watch(petriNetActorRef)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.scalatestplus.mockito.MockitoSugar
import java.util.UUID
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.immutable.Seq
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.Success
Expand Down Expand Up @@ -55,6 +56,7 @@ object ProcessInstanceSpec {
executionContext = testExecutionContext,
idleTTL = None,
encryption = NoEncryption,
Seq.empty,
Seq.empty
)

Expand Down

0 comments on commit d8197e8

Please sign in to comment.