Skip to content

Commit

Permalink
Better deadLetter logging of wrapped messages, akka#28109 (akka#28253)
Browse files Browse the repository at this point in the history
* specifically AdaptMessage that is used for Typed messageAdapter
  and ActorContext.ask
  • Loading branch information
patriknw authored and navaro1 committed Dec 17, 2019
1 parent 2090742 commit 1365c68
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.typed.eventstream.EventStream
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
import org.slf4j.event.Level

object MessageAdapterSpec {
val config = ConfigFactory.parseString("""
akka.log-dead-letters = off
akka.log-dead-letters = on
ping-pong-dispatcher {
executor = thread-pool-executor
type = PinnedDispatcher
Expand Down Expand Up @@ -272,4 +273,29 @@ class MessageAdapterSpec

}

"log wrapped message of DeadLetter" in {
case class Ping(sender: ActorRef[Pong])
case class Pong(greeting: String)
case class PingReply(response: Pong)

val pingProbe = createTestProbe[Ping]()

val snitch = Behaviors.setup[PingReply] { context =>
val replyTo = context.messageAdapter[Pong](PingReply)
pingProbe.ref ! Ping(replyTo)
Behaviors.stopped
}
val ref = spawn(snitch)

createTestProbe().expectTerminated(ref)

LoggingTestKit.empty
.withLogLevel(Level.INFO)
.withMessageRegex("Pong.*wrapped in.*AdaptMessage.*dead letters encountered")
.expect {
pingProbe.receiveMessage().sender ! Pong("hi")
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# 28109 WrappedMessage for better dead letter logging

ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.AdaptMessage.msg")
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package akka.actor.typed.internal

import akka.actor.WrappedMessage
import akka.annotation.InternalApi

/**
Expand All @@ -22,6 +23,8 @@ import akka.annotation.InternalApi
* function. Used by `ActorContext.spawnMessageAdapter` and `ActorContext.ask` so that the function is
* applied in the "parent" actor (for better thread safety)..
*/
@InternalApi private[akka] final case class AdaptMessage[U, T](msg: U, adapter: U => T) extends InternalMessage {
def adapt(): T = adapter(msg)
@InternalApi private[akka] final case class AdaptMessage[U, T](message: U, adapter: U => T)
extends InternalMessage
with WrappedMessage {
def adapt(): T = adapter(message)
}
1 change: 1 addition & 0 deletions akka-actor/src/main/scala/akka/actor/Actor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ final case class UnhandledMessage(
@BeanProperty sender: ActorRef,
@BeanProperty recipient: ActorRef)
extends NoSerializationVerificationNeeded
with WrappedMessage

/**
* Classes for passing status back to the sender.
Expand Down
24 changes: 23 additions & 1 deletion akka-actor/src/main/scala/akka/actor/ActorRef.scala
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef {
* Subscribe to this class to be notified about all [[DeadLetter]] (also the suppressed ones)
* and [[Dropped]].
*/
sealed trait AllDeadLetters {
sealed trait AllDeadLetters extends WrappedMessage {
def message: Any
def sender: ActorRef
def recipient: ActorRef
Expand Down Expand Up @@ -533,6 +533,28 @@ object Dropped {
Dropped(message, reason, ActorRef.noSender, recipient)
}

object WrappedMessage {

/**
* Unwrap [[WrappedMessage]] recursively.
*/
@tailrec def unwrap(message: Any): Any = {
message match {
case w: WrappedMessage => unwrap(w.message)
case _ => message

}
}
}

/**
* Message envelopes may implement this trait for better logging, such as logging of
* message class name of the wrapped message instead of the envelope class name.
*/
trait WrappedMessage {
def message: Any
}

private[akka] object DeadLetterActorRef {
@SerialVersionUID(1L)
class SerializedDeadLetterActorRef extends Serializable { //TODO implement as Protobuf for performance?
Expand Down
5 changes: 4 additions & 1 deletion akka-actor/src/main/scala/akka/actor/ActorSelection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,15 @@ private[akka] final case class ActorSelectionMessage(
elements: immutable.Iterable[SelectionPathElement],
wildcardFanOut: Boolean)
extends AutoReceivedMessage
with PossiblyHarmful {
with PossiblyHarmful
with WrappedMessage {

def identifyRequest: Option[Identify] = msg match {
case x: Identify => Some(x)
case _ => None
}

override def message: Any = msg
}

/**
Expand Down
8 changes: 6 additions & 2 deletions akka-actor/src/main/scala/akka/event/DeadLetterListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import akka.actor.AllDeadLetters
import akka.actor.DeadLetter
import akka.actor.DeadLetterActorRef
import akka.actor.Dropped
import akka.actor.WrappedMessage
import akka.event.Logging.Info
import akka.util.PrettyDuration._

Expand Down Expand Up @@ -96,13 +97,16 @@ class DeadLetterListener extends Actor {

private def logDeadLetter(d: AllDeadLetters, doneMsg: String): Unit = {
val origin = if (isReal(d.sender)) s" from ${d.sender}" else ""
val unwrapped = WrappedMessage.unwrap(d.message)
val messageStr = unwrapped.getClass.getName
val wrappedIn = if (d.message.isInstanceOf[WrappedMessage]) s" wrapped in [${d.message.getClass.getName}]" else ""
val logMessage = d match {
case dropped: Dropped =>
val destination = if (isReal(d.recipient)) s" to ${d.recipient}" else ""
s"Message [${d.message.getClass.getName}]$origin$destination was dropped. ${dropped.reason}. " +
s"Message [$messageStr]$wrappedIn$origin$destination was dropped. ${dropped.reason}. " +
s"[$count] dead letters encountered$doneMsg. "
case _ =>
s"Message [${d.message.getClass.getName}]$origin to ${d.recipient} was not delivered. " +
s"Message [$messageStr]$wrappedIn$origin to ${d.recipient} was not delivered. " +
s"[$count] dead letters encountered$doneMsg. " +
s"If this is not an expected behavior then ${d.recipient} may have terminated unexpectedly. "
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package akka.routing

import scala.collection.immutable

import akka.dispatch.Dispatchers
import com.typesafe.config.Config
import akka.actor.SupervisorStrategy
Expand All @@ -13,10 +14,13 @@ import akka.actor.Address
import akka.actor.ExtendedActorSystem
import akka.actor.ActorSystem
import java.util.concurrent.atomic.AtomicReference

import akka.serialization.SerializationExtension
import scala.util.control.NonFatal

import akka.event.Logging
import akka.actor.ActorPath
import akka.actor.WrappedMessage

object ConsistentHashingRouter {

Expand Down Expand Up @@ -51,7 +55,8 @@ object ConsistentHashingRouter {
@SerialVersionUID(1L)
final case class ConsistentHashableEnvelope(message: Any, hashKey: Any)
extends ConsistentHashable
with RouterEnvelope {
with RouterEnvelope
with WrappedMessage {
override def consistentHashKey: Any = hashKey
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package akka.cluster.sharding.typed

import akka.actor.WrappedMessage
import akka.util.unused

object ShardingMessageExtractor {
Expand Down Expand Up @@ -105,4 +106,4 @@ abstract class HashCodeNoEnvelopeMessageExtractor[M](val numberOfShards: Int) ex
* The alternative way of routing messages through sharding is to not use envelopes,
* and have the message types themselves carry identifiers.
*/
final case class ShardingEnvelope[M](entityId: String, message: M) // TODO think if should remain a case class
final case class ShardingEnvelope[M](entityId: String, message: M) extends WrappedMessage
Original file line number Diff line number Diff line change
Expand Up @@ -190,23 +190,32 @@ object DistributedPubSubMediator {
@SerialVersionUID(1L) final case class SubscribeAck(subscribe: Subscribe) extends DeadLetterSuppression
@SerialVersionUID(1L) final case class UnsubscribeAck(unsubscribe: Unsubscribe)
@SerialVersionUID(1L) final case class Publish(topic: String, msg: Any, sendOneMessageToEachGroup: Boolean)
extends DistributedPubSubMessage {
extends DistributedPubSubMessage
with WrappedMessage {
def this(topic: String, msg: Any) = this(topic, msg, sendOneMessageToEachGroup = false)

override def message: Any = msg
}
object Publish {
def apply(topic: String, msg: Any) = new Publish(topic, msg)
}
@SerialVersionUID(1L) final case class Send(path: String, msg: Any, localAffinity: Boolean)
extends DistributedPubSubMessage {
extends DistributedPubSubMessage
with WrappedMessage {

/**
* Convenience constructor with `localAffinity` false
*/
def this(path: String, msg: Any) = this(path, msg, localAffinity = false)

override def message: Any = msg
}
@SerialVersionUID(1L) final case class SendToAll(path: String, msg: Any, allButSelf: Boolean = false)
extends DistributedPubSubMessage {
extends DistributedPubSubMessage
with WrappedMessage {
def this(path: String, msg: Any) = this(path, msg, allButSelf = false)

override def message: Any = msg
}

sealed abstract class GetTopics
Expand Down

0 comments on commit 1365c68

Please sign in to comment.