Skip to content

Commit

Permalink
Added a 'category' in logs (#1227)
Browse files Browse the repository at this point in the history
Also added paymentHash to MDC context
  • Loading branch information
pm47 authored Dec 4, 2019
1 parent 12f95ca commit 167d65b
Show file tree
Hide file tree
Showing 23 changed files with 298 additions and 137 deletions.
89 changes: 84 additions & 5 deletions eclair-core/src/main/scala/fr/acinq/eclair/Logs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,109 @@ package fr.acinq.eclair

import java.util.UUID

import akka.actor.Terminated
import akka.event.DiagnosticLoggingAdapter
import akka.event.Logging.MDC
import akka.io.Tcp
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.blockchain.ValidateResult
import fr.acinq.eclair.channel.{LocalChannelDown, LocalChannelUpdate}
import fr.acinq.eclair.crypto.TransportHandler.HandshakeCompleted
import fr.acinq.eclair.io.Peer.PeerRoutingMessage
import fr.acinq.eclair.io.{Authenticator, Peer}
import fr.acinq.eclair.router.{ExcludeChannel, GetRoutingState, LiftChannelExclusion, Rebroadcast, RouteRequest, SyncProgress, TickBroadcast, TickPruneStaleChannels}
import fr.acinq.eclair.wire.{ChannelReestablish, Ping, Pong, RoutingMessage, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFulfillHtlc}

object Logs {

def mdc(remoteNodeId_opt: Option[PublicKey] = None, channelId_opt: Option[ByteVector32] = None, parentPaymentId_opt: Option[UUID] = None, paymentId_opt: Option[UUID] = None): MDC =
def mdc(category_opt: Option[LogCategory] = None, remoteNodeId_opt: Option[PublicKey] = None, channelId_opt: Option[ByteVector32] = None, parentPaymentId_opt: Option[UUID] = None, paymentId_opt: Option[UUID] = None, paymentHash_opt: Option[ByteVector32] = None): MDC =
Seq(
category_opt.map(l => "category" -> s" ${l.category}"),
remoteNodeId_opt.map(n => "nodeId" -> s" n:$n"), // nb: we preformat MDC values so that there is no white spaces in logs when they are not defined
channelId_opt.map(c => "channelId" -> s" c:$c"),
parentPaymentId_opt.map(p => "parentPaymentId" -> s" p:$p"),
paymentId_opt.map(i => "paymentId" -> s" i:$i")
paymentId_opt.map(i => "paymentId" -> s" i:$i"),
paymentHash_opt.map(h => "paymentHash" -> s" h:$h")
).flatten.toMap

def withMdc(mdc: MDC)(f: => Any)(implicit log: DiagnosticLoggingAdapter) = {
/**
* Temporarily add the provided MDC to the current one, and then restore the original one.
*
* This is useful in some cases where we can't rely on the `aroundReceive` trick to set the MDC before processing a
* message because we don't have enough context. That's typically the case when handling `Terminated` messages.
*/
def withMdc(log: DiagnosticLoggingAdapter)(mdc: MDC)(f: => Any) = {
val mdc0 = log.mdc // backup the current mdc
try {
log.mdc(mdc)
log.mdc(mdc0 ++ mdc) // add the new mdc to the current one
f
} finally {
log.clearMDC()
log.mdc(mdc0) // restore the original mdc
}
}

// @formatter: off
sealed trait LogCategory {
def category: String
}

object LogCategory {

case object CONNECTION extends LogCategory {
override def category: String = "CON"
}

case object ROUTING_SYNC extends LogCategory {
override def category: String = "SYN"
}

case object PAYMENT extends LogCategory {
override def category: String = "PAY"
}

def apply(currentMessage: Any): Option[LogCategory] = {
currentMessage match {
case _: Tcp.Event => Some(Logs.LogCategory.CONNECTION)
case _: Tcp.Message => Some(Logs.LogCategory.CONNECTION)
case _: ChannelReestablish => Some(LogCategory.CONNECTION)

case _: UpdateAddHtlc => Some(Logs.LogCategory.PAYMENT)
case _: UpdateFulfillHtlc => Some(Logs.LogCategory.PAYMENT)
case _: UpdateFailHtlc => Some(Logs.LogCategory.PAYMENT)
case _: UpdateFailMalformedHtlc => Some(Logs.LogCategory.PAYMENT)

case _: ExcludeChannel => Some(LogCategory.PAYMENT)
case _: LiftChannelExclusion => Some(LogCategory.PAYMENT)
case _: SyncProgress => Some(LogCategory.ROUTING_SYNC)
case GetRoutingState => Some(LogCategory.ROUTING_SYNC)
case _: LocalChannelUpdate => Some(LogCategory.ROUTING_SYNC)
case _: LocalChannelDown => Some(LogCategory.ROUTING_SYNC)
case _: ValidateResult => Some(LogCategory.ROUTING_SYNC)
case _: RouteRequest => Some(LogCategory.PAYMENT)
case _: PeerRoutingMessage => Some(LogCategory.ROUTING_SYNC)
case _: RoutingMessage => Some(LogCategory.ROUTING_SYNC)
case TickBroadcast => Some(LogCategory.ROUTING_SYNC)
case TickPruneStaleChannels => Some(LogCategory.ROUTING_SYNC)

case _: Authenticator.Authenticated => Some(LogCategory.CONNECTION)
case _: Authenticator.PendingAuth => Some(LogCategory.CONNECTION)
case _: HandshakeCompleted => Some(LogCategory.CONNECTION)
case _: Peer.Connect => Some(LogCategory.CONNECTION)
case _: Peer.Disconnect => Some(LogCategory.CONNECTION)
case _: Peer.DelayedRebroadcast => Some(LogCategory.ROUTING_SYNC)
case Peer.Reconnect => Some(LogCategory.CONNECTION)
case _: Ping => Some(LogCategory.CONNECTION)
case _: Pong => Some(LogCategory.CONNECTION)
case _: wire.Init => Some(LogCategory.CONNECTION)
case _: Rebroadcast => Some(LogCategory.ROUTING_SYNC)

case _ => None
}
}
}

// @formatter: on
}

// we use a dedicated class so that the logging can be independently adjusted
Expand Down
10 changes: 7 additions & 3 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import akka.event.Logging.MDC
import akka.pattern.pipe
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey, sha256}
import fr.acinq.bitcoin.{ByteVector32, OutPoint, Satoshi, Script, ScriptFlags, Transaction}
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair._
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.channel.Helpers.{Closing, Funding}
Expand Down Expand Up @@ -99,7 +100,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
import nodeParams.keyManager

// we pass these to helpers classes so that they have the logging context
implicit def implicitLog: akka.event.LoggingAdapter = log
implicit def implicitLog: akka.event.DiagnosticLoggingAdapter = diagLog

val forwarder = context.actorOf(Props(new Forwarder(nodeParams)), "forwarder")

Expand Down Expand Up @@ -1682,7 +1683,9 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
(state, nextState, stateData, nextStateData) match {
// ORDER MATTERS!
case (WAIT_FOR_INIT_INTERNAL, OFFLINE, _, normal: DATA_NORMAL) =>
log.info(s"re-emitting channel_update={} enabled={} ", normal.channelUpdate, Announcements.isEnabled(normal.channelUpdate.channelFlags))
Logs.withMdc(diagLog)(Logs.mdc(category_opt = Some(Logs.LogCategory.CONNECTION))) {
log.info(s"re-emitting channel_update={} enabled={} ", normal.channelUpdate, Announcements.isEnabled(normal.channelUpdate.channelFlags))
}
context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, normal.commitments))
case (_, _, d1: DATA_NORMAL, d2: DATA_NORMAL) if d1.channelUpdate == d2.channelUpdate && d1.channelAnnouncement == d2.channelAnnouncement =>
// don't do anything if neither the channel_update nor the channel_announcement didn't change
Expand Down Expand Up @@ -2272,11 +2275,12 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
def now = Platform.currentTime.milliseconds.toSeconds

override def mdc(currentMessage: Any): MDC = {
val category_opt = LogCategory(currentMessage)
val id = currentMessage match {
case INPUT_RESTORED(data) => data.channelId
case _ => Helpers.getChannelId(stateData)
}
Logs.mdc(remoteNodeId_opt = Some(remoteNodeId), channelId_opt = Some(id))
Logs.mdc(category_opt, remoteNodeId_opt = Some(remoteNodeId), channelId_opt = Some(id))
}

// we let the peer decide what to do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package fr.acinq.eclair.channel

import akka.event.LoggingAdapter
import akka.event.{DiagnosticLoggingAdapter, LoggingAdapter}
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey, ripemd160, sha256}
import fr.acinq.bitcoin.Script._
import fr.acinq.bitcoin._
Expand Down Expand Up @@ -163,10 +163,12 @@ object Helpers {
* @param currentUpdateTimestamp
* @return the delay until the next update
*/
def nextChannelUpdateRefresh(currentUpdateTimestamp: Long)(implicit log: LoggingAdapter): FiniteDuration = {
def nextChannelUpdateRefresh(currentUpdateTimestamp: Long)(implicit log: DiagnosticLoggingAdapter): FiniteDuration = {
val age = Platform.currentTime.milliseconds - currentUpdateTimestamp.seconds
val delay = 0.days.max(REFRESH_CHANNEL_UPDATE_INTERVAL - age)
log.info("current channel_update was created {} days ago, will refresh it in {} days", age.toDays, delay.toDays)
Logs.withMdc(log)(Logs.mdc(category_opt = Some(Logs.LogCategory.CONNECTION))) {
log.info("current channel_update was created {} days ago, will refresh it in {} days", age.toDays, delay.toDays)
}
delay
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import akka.io.Tcp
import akka.util.ByteString
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.Protocol
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair.crypto.Noise._
import fr.acinq.eclair.wire.{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, _}
import fr.acinq.eclair.{Diagnostics, FSMDiagnosticActorLogging, Logs}
Expand Down Expand Up @@ -64,7 +65,9 @@ class TransportHandler[T: ClassTag](keyPair: KeyPair, rs: Option[ByteVector], co
case _ => None
}

wireLog.mdc(Logs.mdc(remoteNodeId_opt, channelId_opt))
val category_opt = LogCategory(message)

wireLog.mdc(Logs.mdc(category_opt, remoteNodeId_opt, channelId_opt))
if (channelId_opt.isDefined) {
// channel-related messages are logged as info
wireLog.info(s"$direction msg={}", message)
Expand Down Expand Up @@ -266,7 +269,10 @@ class TransportHandler[T: ClassTag](keyPair: KeyPair, rs: Option[ByteVector], co

initialize()

override def mdc(currentMessage: Any): MDC = Logs.mdc(remoteNodeId_opt = remoteNodeId_opt)
override def mdc(currentMessage: Any): MDC = {
val category_opt = LogCategory(currentMessage)
Logs.mdc(category_opt, remoteNodeId_opt = remoteNodeId_opt)
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.net.InetSocketAddress
import akka.actor.{Actor, ActorRef, DiagnosticActorLogging, OneForOneStrategy, Props, Status, SupervisorStrategy, Terminated}
import akka.event.Logging.MDC
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair.crypto.Noise.KeyPair
import fr.acinq.eclair.crypto.TransportHandler
import fr.acinq.eclair.crypto.TransportHandler.HandshakeCompleted
Expand Down Expand Up @@ -82,7 +83,7 @@ class Authenticator(nodeParams: NodeParams) extends Actor with DiagnosticActorLo
case HandshakeCompleted(_, _, remoteNodeId) => Some(remoteNodeId)
case _ => None
}
Logs.mdc(remoteNodeId_opt = remoteNodeId_opt)
Logs.mdc(Some(LogCategory.CONNECTION), remoteNodeId_opt = remoteNodeId_opt)
}
}

Expand Down
7 changes: 4 additions & 3 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import akka.event.Logging.MDC
import akka.io.Tcp.SO.KeepAlive
import akka.io.{IO, Tcp}
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair.io.Client.ConnectionFailed
import fr.acinq.eclair.tor.Socks5Connection.{Socks5Connect, Socks5Connected, Socks5Error}
import fr.acinq.eclair.tor.{Socks5Connection, Socks5ProxyParams}
Expand Down Expand Up @@ -101,16 +102,16 @@ class Client(nodeParams: NodeParams, authenticator: ActorRef, remoteAddress: Ine
// we should not restart a failing socks client
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
case t =>
Logs.withMdc(Logs.mdc(remoteNodeId_opt = Some(remoteNodeId))) {
Logs.withMdc(log)(Logs.mdc(remoteNodeId_opt = Some(remoteNodeId))) {
t match {
case Socks5Error(msg) => log.info(s"SOCKS5 error: $msg")
case _ => log.error(t, "")
}
}(log)
}
SupervisorStrategy.Stop
}

override def mdc(currentMessage: Any): MDC = Logs.mdc(remoteNodeId_opt = Some(remoteNodeId))
override def mdc(currentMessage: Any): MDC = Logs.mdc(Some(LogCategory.CONNECTION), remoteNodeId_opt = Some(remoteNodeId))

private def str(address: InetSocketAddress): String = s"${address.getHostString}:${address.getPort}"

Expand Down
14 changes: 10 additions & 4 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import akka.event.Logging.MDC
import akka.util.Timeout
import com.google.common.net.HostAndPort
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, DeterministicWallet, Satoshi}
import fr.acinq.bitcoin.{ByteVector32, Crypto, DeterministicWallet, Satoshi}
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair.blockchain.EclairWallet
import fr.acinq.eclair.channel._
import fr.acinq.eclair.crypto.TransportHandler
import fr.acinq.eclair.crypto.TransportHandler.HandshakeCompleted
import fr.acinq.eclair.router._
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{wire, _}
Expand All @@ -36,7 +38,7 @@ import scodec.bits.ByteVector

import scala.compat.Platform
import scala.concurrent.duration._
import scala.util.Random
import scala.util.{Failure, Random, Success, Try}

/**
* Created by PM on 26/08/2016.
Expand Down Expand Up @@ -186,7 +188,9 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: A
stay

case Event(Terminated(actor), d: InitializingData) if actor == d.transport =>
log.warning(s"lost connection to $remoteNodeId")
Logs.withMdc(diagLog)(Logs.mdc(category_opt = Some(Logs.LogCategory.CONNECTION))) {
log.warning(s"lost connection to $remoteNodeId")
}
goto(DISCONNECTED) using DisconnectedData(d.address_opt, d.channels)

case Event(Terminated(actor), d: InitializingData) if d.channels.exists(_._2 == actor) =>
Expand Down Expand Up @@ -577,7 +581,9 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: A

initialize()

override def mdc(currentMessage: Any): MDC = Logs.mdc(remoteNodeId_opt = Some(remoteNodeId))
override def mdc(currentMessage: Any): MDC = {
Logs.mdc(LogCategory(currentMessage), remoteNodeId_opt = Some(remoteNodeId))
}
}

object Peer {
Expand Down
10 changes: 6 additions & 4 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@ package fr.acinq.eclair.io
import java.net.InetSocketAddress

import akka.Done
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.actor.{Actor, ActorLogging, ActorRef, DiagnosticActorLogging, Props}
import akka.event.Logging.MDC
import akka.io.Tcp.SO.KeepAlive
import akka.io.{IO, Tcp}
import fr.acinq.eclair.NodeParams
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair.{Logs, NodeParams}
import kamon.Kamon

import scala.concurrent.Promise

/**
* Created by PM on 27/10/2015.
*/
class Server(nodeParams: NodeParams, authenticator: ActorRef, address: InetSocketAddress, bound: Option[Promise[Done]] = None) extends Actor with ActorLogging {
class Server(nodeParams: NodeParams, authenticator: ActorRef, address: InetSocketAddress, bound: Option[Promise[Done]] = None) extends Actor with DiagnosticActorLogging {

import Tcp._
import context.system
Expand Down Expand Up @@ -59,7 +61,7 @@ class Server(nodeParams: NodeParams, authenticator: ActorRef, address: InetSocke
listener ! ResumeAccepting(batchSize = 1)
}

override def unhandled(message: Any): Unit = log.warning(s"unhandled message=$message")
override def mdc(currentMessage: Any): MDC = Logs.mdc(Some(LogCategory.CONNECTION))
}

object Server {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ package fr.acinq.eclair.payment.receive

import akka.actor.Actor.Receive
import akka.actor.{ActorContext, ActorRef}
import akka.event.LoggingAdapter
import akka.event.DiagnosticLoggingAdapter

/**
* Simple handler that forwards all messages to an actor
*/
class ForwardHandler(actor: ActorRef) extends ReceiveHandler {
override def handle(implicit ctx: ActorContext, log: LoggingAdapter): Receive = { case msg => actor forward msg}
override def handle(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Receive = { case msg => actor forward msg}
}
Loading

0 comments on commit 167d65b

Please sign in to comment.