diff --git a/docs/release-notes/eclair-vnext.md b/docs/release-notes/eclair-vnext.md index f90ad7ff51..1ac72fda6b 100644 --- a/docs/release-notes/eclair-vnext.md +++ b/docs/release-notes/eclair-vnext.md @@ -24,6 +24,33 @@ Existing `static_remote_key` channels will continue to work. You can override th Eclair will not allow remote peers to open new obsolete channels that do not support `option_static_remotekey`. +### Local reputation and HTLC endorsement + +To protect against jamming attacks, eclair gives a reputation to its neighbors and uses it to decide if a HTLC should be relayed given how congested the outgoing channel is. +The reputation is basically how much this node paid us in fees divided by how much they should have paid us for the liquidity and slots that they blocked. +The reputation is per incoming node and endorsement level. +The confidence that the HTLC will be fulfilled is transmitted to the next node using the endorsement TLV of the `update_add_htlc` message. +Note that HTLCs that are considered dangerous are still relayed: this is the first phase of a network-wide experimentation aimed at collecting data. + +To configure, edit `eclair.conf`: + +```eclair.conf +// We assign reputations to our peers to prioritize payments during congestion. +// The reputation is computed as fees paid divided by what should have been paid if all payments were successful. +eclair.relay.peer-reputation { + // Set this parameter to false to disable the reputation algorithm and simply relay the incoming endorsement + // value, as described by https://github.com/lightning/blips/blob/master/blip-0004.md, + enabled = true + // Reputation decays with the following half life to emphasize recent behavior. + half-life = 7 days + // Payments that stay pending for longer than this get penalized + max-relay-duration = 12 seconds + // Pending payments are counted as failed, and because they could potentially stay pending for a very long time, + // the following multiplier is applied. + pending-multiplier = 1000 // A pending payment counts as a thousand failed ones. +} +``` + ### API changes diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index fd58844a1d..131b1e4621 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -237,6 +237,23 @@ eclair { // Number of blocks before the incoming HTLC expires that an async payment must be triggered by the receiver cancel-safety-before-timeout-blocks = 144 } + + // We assign reputation to our peers to prioritize payments during congestion. + // The reputation is computed as fees paid divided by what should have been paid if all payments were successful. + peer-reputation { + // Set this parameter to false to disable the reputation algorithm and simply relay the incoming endorsement + // value, as described by https://github.com/lightning/blips/blob/master/blip-0004.md, + enabled = true + // Reputation decays with the following half life to emphasize recent behavior. + half-life = 15 days + // Payments that stay pending for longer than this get penalized. + max-relay-duration = 12 seconds + // Pending payments are counted as failed, and because they could potentially stay pending for a very long time, + // the following multiplier is applied. We want it to be as close as possible to the true cost of a worst case + // HTLC (max-cltv-delta / max-relay-duration, around 100000 with default parameters) while still being comparable + // to the number of HTLCs received per peer during twice the half life. + pending-multiplier = 200 // A pending payment counts as two hundred failed ones. + } } on-chain-fees { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index 1c66625b53..400995784b 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -31,6 +31,7 @@ import fr.acinq.eclair.io.MessageRelay.{RelayAll, RelayChannelsOnly, RelayPolicy import fr.acinq.eclair.io.PeerConnection import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams} +import fr.acinq.eclair.reputation.Reputation import fr.acinq.eclair.router.Announcements.AddressException import fr.acinq.eclair.router.Graph.{HeuristicsConstants, WeightRatios} import fr.acinq.eclair.router.Router._ @@ -561,7 +562,13 @@ object NodeParams extends Logging { privateChannelFees = getRelayFees(config.getConfig("relay.fees.private-channels")), minTrampolineFees = getRelayFees(config.getConfig("relay.fees.min-trampoline")), enforcementDelay = FiniteDuration(config.getDuration("relay.fees.enforcement-delay").getSeconds, TimeUnit.SECONDS), - asyncPaymentsParams = AsyncPaymentsParams(asyncPaymentHoldTimeoutBlocks, asyncPaymentCancelSafetyBeforeTimeoutBlocks) + asyncPaymentsParams = AsyncPaymentsParams(asyncPaymentHoldTimeoutBlocks, asyncPaymentCancelSafetyBeforeTimeoutBlocks), + peerReputationConfig = Reputation.Config( + enabled = config.getBoolean("relay.peer-reputation.enabled"), + halfLife = FiniteDuration(config.getDuration("relay.peer-reputation.half-life").getSeconds, TimeUnit.SECONDS), + maxRelayDuration = FiniteDuration(config.getDuration("relay.peer-reputation.max-relay-duration").getSeconds, TimeUnit.SECONDS), + pendingMultiplier = config.getDouble("relay.peer-reputation.pending-multiplier"), + ), ), db = database, autoReconnect = config.getBoolean("auto-reconnect"), diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala index b6ca12c5e5..e2dfcf810f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala @@ -44,6 +44,7 @@ import fr.acinq.eclair.payment.offer.OfferManager import fr.acinq.eclair.payment.receive.PaymentHandler import fr.acinq.eclair.payment.relay.{AsyncPaymentTriggerer, PostRestartHtlcCleaner, Relayer} import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator} +import fr.acinq.eclair.reputation.ReputationRecorder import fr.acinq.eclair.router._ import fr.acinq.eclair.tor.{Controller, TorProtocolHandler} import fr.acinq.eclair.wire.protocol.NodeAddress @@ -360,7 +361,12 @@ class Setup(val datadir: File, offerManager = system.spawn(Behaviors.supervise(OfferManager(nodeParams, router, paymentTimeout = 1 minute)).onFailure(typed.SupervisorStrategy.resume), name = "offer-manager") paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler", SupervisorStrategy.Resume)) triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "async-payment-triggerer") - relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, triggerer, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume)) + reputationRecorder_opt = if (nodeParams.relayParams.peerReputationConfig.enabled) { + Some(system.spawn(Behaviors.supervise(ReputationRecorder(nodeParams.relayParams.peerReputationConfig, Map.empty)).onFailure(typed.SupervisorStrategy.resume), name = "reputation-recorder")) + } else { + None + } + relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, triggerer, reputationRecorder_opt, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume)) _ = relayer ! PostRestartHtlcCleaner.Init(channels) // Before initializing the switchboard (which re-connects us to the network) and the user-facing parts of the system, // we want to make sure the handler for post-restart broken HTLCs has finished initializing. diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelExceptions.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelExceptions.scala index cea5739003..32575dea99 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelExceptions.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelExceptions.scala @@ -118,6 +118,8 @@ case class ExpiryTooBig (override val channelId: Byte case class HtlcValueTooSmall (override val channelId: ByteVector32, minimum: MilliSatoshi, actual: MilliSatoshi) extends ChannelException(channelId, s"htlc value too small: minimum=$minimum actual=$actual") case class HtlcValueTooHighInFlight (override val channelId: ByteVector32, maximum: MilliSatoshi, actual: MilliSatoshi) extends ChannelException(channelId, s"in-flight htlcs hold too much value: maximum=$maximum actual=$actual") case class TooManyAcceptedHtlcs (override val channelId: ByteVector32, maximum: Long) extends ChannelException(channelId, s"too many accepted htlcs: maximum=$maximum") +case class TooManySmallHtlcs (override val channelId: ByteVector32, number: Long, below: MilliSatoshi) extends ChannelException(channelId, s"too many small htlcs: $number HTLCs below $below") +case class ConfidenceTooLow (override val channelId: ByteVector32, confidence: Double, occupancy: Double) extends ChannelException(channelId, s"confidence too low: confidence=$confidence occupancy=$occupancy") case class LocalDustHtlcExposureTooHigh (override val channelId: ByteVector32, maximum: Satoshi, actual: MilliSatoshi) extends ChannelException(channelId, s"dust htlcs hold too much value: maximum=$maximum actual=$actual") case class RemoteDustHtlcExposureTooHigh (override val channelId: ByteVector32, maximum: Satoshi, actual: MilliSatoshi) extends ChannelException(channelId, s"dust htlcs hold too much value: maximum=$maximum actual=$actual") case class InsufficientFunds (override val channelId: ByteVector32, amount: MilliSatoshi, missing: Satoshi, reserve: Satoshi, fees: Satoshi) extends ChannelException(channelId, s"insufficient funds: missing=$missing reserve=$reserve fees=$fees") diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala index da1eb5c7f5..10d3207592 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala @@ -429,7 +429,7 @@ case class Commitment(fundingTxIndex: Long, localCommit.spec.htlcs.collect(DirectedHtlc.incoming).filter(nearlyExpired) } - def canSendAdd(amount: MilliSatoshi, params: ChannelParams, changes: CommitmentChanges, feerates: FeeratesPerKw, feeConf: OnChainFeeConf): Either[ChannelException, Unit] = { + def canSendAdd(amount: MilliSatoshi, params: ChannelParams, changes: CommitmentChanges, feerates: FeeratesPerKw, feeConf: OnChainFeeConf, confidence: Double): Either[ChannelException, Unit] = { // we allowed mismatches between our feerates and our remote's as long as commitments didn't contain any HTLC at risk // we need to verify that we're not disagreeing on feerates anymore before offering new HTLCs // NB: there may be a pending update_fee that hasn't been applied yet that needs to be taken into account @@ -488,7 +488,8 @@ case class Commitment(fundingTxIndex: Long, if (allowedHtlcValueInFlight < htlcValueInFlight) { return Left(HtlcValueTooHighInFlight(params.channelId, maximum = allowedHtlcValueInFlight, actual = htlcValueInFlight)) } - if (Seq(params.localParams.maxAcceptedHtlcs, params.remoteParams.maxAcceptedHtlcs).min < outgoingHtlcs.size) { + val maxAcceptedHtlcs = params.localParams.maxAcceptedHtlcs.min(params.remoteParams.maxAcceptedHtlcs) + if (maxAcceptedHtlcs < outgoingHtlcs.size) { return Left(TooManyAcceptedHtlcs(params.channelId, maximum = Seq(params.localParams.maxAcceptedHtlcs, params.remoteParams.maxAcceptedHtlcs).min)) } @@ -505,6 +506,18 @@ case class Commitment(fundingTxIndex: Long, return Left(RemoteDustHtlcExposureTooHigh(params.channelId, maxDustExposure, remoteDustExposureAfterAdd)) } + // Jamming protection + // Must be the last checks so that they can be ignored for shadow deployment. + for ((amountMsat, i) <- outgoingHtlcs.toSeq.map(_.amountMsat).sorted.zipWithIndex) { + if ((amountMsat.toLong < 1) || (math.log(amountMsat.toLong.toDouble) * maxAcceptedHtlcs / math.log(params.localParams.maxHtlcValueInFlightMsat.toLong.toDouble / maxAcceptedHtlcs) < i)) { + return Left(TooManySmallHtlcs(params.channelId, number = i + 1, below = amountMsat)) + } + } + val occupancy = (outgoingHtlcs.size.toDouble / maxAcceptedHtlcs).max(htlcValueInFlight.toLong.toDouble / allowedHtlcValueInFlight.toLong.toDouble) + if (confidence + 0.05 < occupancy) { + return Left(ConfidenceTooLow(params.channelId, confidence, occupancy)) + } + Right(()) } @@ -835,7 +848,7 @@ case class Commitments(params: ChannelParams, * @param cmd add HTLC command * @return either Left(failure, error message) where failure is a failure message (see BOLT #4 and the Failure Message class) or Right(new commitments, updateAddHtlc) */ - def sendAdd(cmd: CMD_ADD_HTLC, currentHeight: BlockHeight, channelConf: ChannelConf, feerates: FeeratesPerKw, feeConf: OnChainFeeConf): Either[ChannelException, (Commitments, UpdateAddHtlc)] = { + def sendAdd(cmd: CMD_ADD_HTLC, currentHeight: BlockHeight, channelConf: ChannelConf, feerates: FeeratesPerKw, feeConf: OnChainFeeConf)(implicit log: LoggingAdapter): Either[ChannelException, (Commitments, UpdateAddHtlc)] = { // we must ensure we're not relaying htlcs that are already expired, otherwise the downstream channel will instantly close // NB: we add a 3 blocks safety to reduce the probability of running into this when our bitcoin node is slightly outdated val minExpiry = CltvExpiry(currentHeight + 3) @@ -859,8 +872,24 @@ case class Commitments(params: ChannelParams, val changes1 = changes.addLocalProposal(add).copy(localNextHtlcId = changes.localNextHtlcId + 1) val originChannels1 = originChannels + (add.id -> cmd.origin) // we verify that this htlc is allowed in every active commitment - active.map(_.canSendAdd(add.amountMsat, params, changes1, feerates, feeConf)) - .collectFirst { case Left(f) => Left(f) } + val canSendAdds = active.map(_.canSendAdd(add.amountMsat, params, changes1, feerates, feeConf, cmd.confidence)) + // Log only for jamming protection. + canSendAdds.collectFirst { + case Left(f: TooManySmallHtlcs) => + log.info("TooManySmallHtlcs: {} outgoing HTLCs are below {}}", f.number, f.below) + Metrics.dropHtlc(f, Tags.Directions.Outgoing) + case Left(f: ConfidenceTooLow) => + log.info("ConfidenceTooLow: confidence is {}% while channel is {}% full", (100 * f.confidence).toInt, (100 * f.occupancy).toInt) + Metrics.dropHtlc(f, Tags.Directions.Outgoing) + } + canSendAdds.flatMap { // TODO: We ignore jamming protection, delete this flatMap to activate jamming protection. + case Left(_: TooManySmallHtlcs) | Left(_: ConfidenceTooLow) => None + case x => Some(x) + } + .collectFirst { case Left(f) => + Metrics.dropHtlc(f, Tags.Directions.Outgoing) + Left(f) + } .getOrElse(Right(copy(changes = changes1, originChannels = originChannels1), add)) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Monitoring.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Monitoring.scala index 748751968c..648a0ce927 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Monitoring.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Monitoring.scala @@ -35,6 +35,7 @@ object Monitoring { val RemoteFeeratePerByte = Kamon.histogram("channels.remote-feerate-per-byte") val Splices = Kamon.histogram("channels.splices", "Splices") val ProcessMessage = Kamon.timer("channels.messages-processed") + val HtlcDropped = Kamon.counter("channels.htlc-dropped") def recordHtlcsInFlight(remoteSpec: CommitmentSpec, previousRemoteSpec: CommitmentSpec): Unit = { for (direction <- Tags.Directions.Incoming :: Tags.Directions.Outgoing :: Nil) { @@ -75,6 +76,10 @@ object Monitoring { Metrics.Splices.withTag(Tags.Origin, Tags.Origins.Remote).withTag(Tags.SpliceType, Tags.SpliceTypes.SpliceCpfp).record(Math.abs(fundingParams.remoteContribution.toLong)) } } + + def dropHtlc(reason: ChannelException, direction: String): Unit = { + HtlcDropped.withTag(Tags.Reason, reason.getClass.getSimpleName).withTag(Tags.Direction, direction).increment() + } } object Tags { @@ -85,6 +90,7 @@ object Monitoring { val State = "state" val CommitmentFormat = "commitment-format" val SpliceType = "splice-type" + val Reason = "reason" object Events { val Created = "created" diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala index 25b6cbe2c9..e53deeddab 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelay.scala @@ -29,6 +29,8 @@ import fr.acinq.eclair.db.PendingCommandsDb import fr.acinq.eclair.payment.Monitoring.{Metrics, Tags} import fr.acinq.eclair.payment.relay.Relayer.{OutgoingChannel, OutgoingChannelParams} import fr.acinq.eclair.payment.{ChannelPaymentRelayed, IncomingPaymentPacket} +import fr.acinq.eclair.reputation.ReputationRecorder +import fr.acinq.eclair.reputation.ReputationRecorder.{CancelRelay, GetConfidence, RecordResult} import fr.acinq.eclair.wire.protocol.FailureMessageCodecs.createBadOnionFailure import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload import fr.acinq.eclair.wire.protocol._ @@ -44,6 +46,7 @@ object ChannelRelay { // @formatter:off sealed trait Command private case object DoRelay extends Command + private case class WrappedConfidence(confidence: Double) extends Command private case class WrappedForwardFailure(failure: Register.ForwardFailure[CMD_ADD_HTLC]) extends Command private case class WrappedAddResponse(res: CommandResponse[CMD_ADD_HTLC]) extends Command // @formatter:on @@ -56,8 +59,9 @@ object ChannelRelay { def apply(nodeParams: NodeParams, register: ActorRef, + reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.ChannelRelayCommand]], channels: Map[ByteVector32, Relayer.OutgoingChannel], - originNode:PublicKey, + originNode: PublicKey, relayId: UUID, r: IncomingPaymentPacket.ChannelRelayPacket): Behavior[Command] = Behaviors.setup { context => @@ -67,9 +71,18 @@ object ChannelRelay { paymentHash_opt = Some(r.add.paymentHash), nodeAlias_opt = Some(nodeParams.alias))) { val upstream = Upstream.Hot.Channel(r.add.removeUnknownTlvs(), TimestampMilli.now(), originNode) - context.self ! DoRelay - val confidence = (r.add.endorsement + 0.5) / 8 - new ChannelRelay(nodeParams, register, channels, r, upstream, confidence, context).relay(Seq.empty) + reputationRecorder_opt match { + case Some(reputationRecorder) => + reputationRecorder ! GetConfidence(context.messageAdapter[ReputationRecorder.Confidence](confidence => WrappedConfidence(confidence.value)), originNode, r.add.endorsement, relayId, r.relayFeeMsat) + case None => + val confidence = (r.add.endorsement + 0.5) / 8 + context.self ! WrappedConfidence(confidence) + } + Behaviors.receiveMessagePartial { + case WrappedConfidence(confidence) => + context.self ! DoRelay + new ChannelRelay(nodeParams, register, reputationRecorder_opt, channels, r, upstream, confidence, context, relayId).relay(Seq.empty) + } } } @@ -110,11 +123,13 @@ object ChannelRelay { */ class ChannelRelay private(nodeParams: NodeParams, register: ActorRef, + reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.ChannelRelayCommand]], channels: Map[ByteVector32, Relayer.OutgoingChannel], r: IncomingPaymentPacket.ChannelRelayPacket, upstream: Upstream.Hot.Channel, confidence: Double, - context: ActorContext[ChannelRelay.Command]) { + context: ActorContext[ChannelRelay.Command], + relayId: UUID) { import ChannelRelay._ @@ -134,6 +149,7 @@ class ChannelRelay private(nodeParams: NodeParams, case RelayFailure(cmdFail) => Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel) context.log.info("rejecting htlc reason={}", cmdFail.reason) + reputationRecorder_opt.foreach(_ ! CancelRelay(upstream.receivedFrom, r.add.endorsement, relayId)) safeSendAndStop(r.add.channelId, cmdFail) case RelaySuccess(selectedChannelId, cmdAdd) => context.log.info("forwarding htlc #{} from channelId={} to channelId={}", r.add.id, r.add.channelId, selectedChannelId) @@ -149,6 +165,7 @@ class ChannelRelay private(nodeParams: NodeParams, context.log.warn(s"couldn't resolve downstream channel $channelId, failing htlc #${upstream.add.id}") val cmdFail = CMD_FAIL_HTLC(upstream.add.id, Right(UnknownNextPeer()), commit = true) Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel) + reputationRecorder_opt.foreach(_ ! CancelRelay(upstream.receivedFrom, r.add.endorsement, relayId)) safeSendAndStop(upstream.add.channelId, cmdFail) case WrappedAddResponse(addFailed: RES_ADD_FAILED[_]) => @@ -325,9 +342,11 @@ class ChannelRelay private(nodeParams: NodeParams, } } - private def recordRelayDuration(isSuccess: Boolean): Unit = + private def recordRelayDuration(isSuccess: Boolean): Unit = { + reputationRecorder_opt.foreach(_ ! RecordResult(upstream.receivedFrom, r.add.endorsement, relayId, isSuccess)) Metrics.RelayedPaymentDuration .withTag(Tags.Relay, Tags.RelayType.Channel) .withTag(Tags.Success, isSuccess) .record((TimestampMilli.now() - upstream.receivedAt).toMillis, TimeUnit.MILLISECONDS) + } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala index 39d61a22c4..d5d4d11b7f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/ChannelRelayer.scala @@ -16,15 +16,16 @@ package fr.acinq.eclair.payment.relay -import akka.actor.ActorRef import akka.actor.typed.Behavior import akka.actor.typed.eventstream.EventStream import akka.actor.typed.scaladsl.Behaviors +import akka.actor.{ActorRef, typed} import fr.acinq.bitcoin.scalacompat.ByteVector32 import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair.channel._ import fr.acinq.eclair.payment.IncomingPaymentPacket -import fr.acinq.eclair.{SubscriptionsComplete, Logs, NodeParams, ShortChannelId} +import fr.acinq.eclair.reputation.ReputationRecorder +import fr.acinq.eclair.{Logs, NodeParams, ShortChannelId, SubscriptionsComplete} import java.util.UUID import scala.collection.mutable @@ -58,6 +59,7 @@ object ChannelRelayer { def apply(nodeParams: NodeParams, register: ActorRef, + reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.ChannelRelayCommand]], channels: Map[ByteVector32, Relayer.OutgoingChannel] = Map.empty, scid2channels: Map[ShortChannelId, ByteVector32] = Map.empty, node2channels: mutable.MultiDict[PublicKey, ByteVector32] = mutable.MultiDict.empty): Behavior[Command] = @@ -79,7 +81,7 @@ object ChannelRelayer { case None => Map.empty } context.log.debug(s"spawning a new handler with relayId=$relayId to nextNodeId={} with channels={}", nextNodeId_opt.getOrElse(""), nextChannels.keys.mkString(",")) - context.spawn(ChannelRelay.apply(nodeParams, register, nextChannels, originNode, relayId, channelRelayPacket), name = relayId.toString) + context.spawn(ChannelRelay.apply(nodeParams, register, reputationRecorder_opt, nextChannels, originNode, relayId, channelRelayPacket), name = relayId.toString) Behaviors.same case GetOutgoingChannels(replyTo, Relayer.GetOutgoingChannels(enabledOnly)) => @@ -100,14 +102,14 @@ object ChannelRelayer { context.log.debug("adding mappings={} to channelId={}", mappings.keys.mkString(","), channelId) val scid2channels1 = scid2channels ++ mappings val node2channels1 = node2channels.addOne(remoteNodeId, channelId) - apply(nodeParams, register, channels1, scid2channels1, node2channels1) + apply(nodeParams, register, reputationRecorder_opt, channels1, scid2channels1, node2channels1) case WrappedLocalChannelDown(LocalChannelDown(_, channelId, shortIds, remoteNodeId)) => context.log.debug(s"removed local channel info for channelId=$channelId localAlias=${shortIds.localAlias}") val channels1 = channels - channelId val scid2Channels1 = scid2channels - shortIds.localAlias -- shortIds.real.toOption val node2channels1 = node2channels.subtractOne(remoteNodeId, channelId) - apply(nodeParams, register, channels1, scid2Channels1, node2channels1) + apply(nodeParams, register, reputationRecorder_opt, channels1, scid2Channels1, node2channels1) case WrappedAvailableBalanceChanged(AvailableBalanceChanged(_, channelId, shortIds, commitments)) => val channels1 = channels.get(channelId) match { @@ -116,7 +118,7 @@ object ChannelRelayer { channels + (channelId -> c.copy(commitments = commitments)) case None => channels // we only consider the balance if we have the channel_update } - apply(nodeParams, register, channels1, scid2channels, node2channels) + apply(nodeParams, register, reputationRecorder_opt, channels1, scid2channels, node2channels) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala index 49471f82b3..8710f9fb8f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelay.scala @@ -36,6 +36,7 @@ import fr.acinq.eclair.payment.send.MultiPartPaymentLifecycle.{PreimageReceived, import fr.acinq.eclair.payment.send.PaymentInitiator.SendPaymentConfig import fr.acinq.eclair.payment.send.PaymentLifecycle.SendPaymentToNode import fr.acinq.eclair.payment.send._ +import fr.acinq.eclair.reputation.ReputationRecorder import fr.acinq.eclair.router.Router.RouteParams import fr.acinq.eclair.router.{BalanceTooLow, RouteNotFound} import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload @@ -64,6 +65,7 @@ object NodeRelay { private case class WrappedPaymentFailed(paymentFailed: PaymentFailed) extends Command private[relay] case class WrappedPeerReadyResult(result: AsyncPaymentTriggerer.Result) extends Command private case class WrappedResolvedPaths(resolved: Seq[ResolvedPath]) extends Command + private case class WrappedConfidence(confidence: Double) extends Command // @formatter:on trait OutgoingPaymentFactory { @@ -85,6 +87,7 @@ object NodeRelay { def apply(nodeParams: NodeParams, parent: typed.ActorRef[NodeRelayer.Command], register: ActorRef, + reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.TrampolineRelayCommand]], relayId: UUID, nodeRelayPacket: NodeRelayPacket, outgoingPaymentFactory: OutgoingPaymentFactory, @@ -108,7 +111,7 @@ object NodeRelay { case IncomingPaymentPacket.RelayToTrampolinePacket(_, _, _, nextPacket) => Some(nextPacket) case _: IncomingPaymentPacket.RelayToBlindedPathsPacket => None } - new NodeRelay(nodeParams, parent, register, relayId, paymentHash, nodeRelayPacket.outerPayload.paymentSecret, context, outgoingPaymentFactory, triggerer, router) + new NodeRelay(nodeParams, parent, register, reputationRecorder_opt, relayId, paymentHash, nodeRelayPacket.outerPayload.paymentSecret, context, outgoingPaymentFactory, triggerer, router) .receiving(Queue.empty, nodeRelayPacket.innerPayload, nextPacket_opt, incomingPaymentHandler) } } @@ -183,6 +186,7 @@ object NodeRelay { class NodeRelay private(nodeParams: NodeParams, parent: akka.actor.typed.ActorRef[NodeRelayer.Command], register: ActorRef, + reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.TrampolineRelayCommand]], relayId: UUID, paymentHash: ByteVector32, paymentSecret: ByteVector32, @@ -259,8 +263,20 @@ class NodeRelay private(nodeParams: NodeParams, private def doSend(upstream: Upstream.Hot.Trampoline, nextPayload: IntermediatePayload.NodeRelay, nextPacket_opt: Option[OnionRoutingPacket]): Behavior[Command] = { context.log.debug(s"relaying trampoline payment (amountIn=${upstream.amountIn} expiryIn=${upstream.expiryIn} amountOut=${nextPayload.amountToForward} expiryOut=${nextPayload.outgoingCltv})") - val confidence = (upstream.received.map(_.add.endorsement).min + 0.5) / 8 - relay(upstream, nextPayload, nextPacket_opt, confidence) + val totalFee = upstream.amountIn - nextPayload.amountToForward + val fees = upstream.received.foldLeft(Map.empty[ReputationRecorder.PeerEndorsement, MilliSatoshi])((fees, r) => + fees.updatedWith(ReputationRecorder.PeerEndorsement(r.receivedFrom, r.add.endorsement))(fee => + Some(fee.getOrElse(MilliSatoshi(0)) + r.add.amountMsat * totalFee.toLong / upstream.amountIn.toLong))) + reputationRecorder_opt match { + case Some(reputationRecorder) => reputationRecorder ! ReputationRecorder.GetTrampolineConfidence(context.messageAdapter[ReputationRecorder.Confidence](confidence => WrappedConfidence(confidence.value)), fees, relayId) + case None => context.self ! WrappedConfidence((upstream.received.map(_.add.endorsement).min + 0.5) / 8) + } + Behaviors.receiveMessagePartial { + rejectExtraHtlcPartialFunction orElse { + case WrappedConfidence(confidence) => + relay(upstream, nextPayload, nextPacket_opt, confidence) + } + } } /** @@ -287,6 +303,11 @@ class NodeRelay private(nodeParams: NodeParams, case WrappedPaymentSent(paymentSent) => context.log.debug("trampoline payment fully resolved downstream") success(upstream, fulfilledUpstream, paymentSent) + val totalFee = upstream.amountIn - paymentSent.amountWithFees + val fees = upstream.received.foldLeft(Map.empty[ReputationRecorder.PeerEndorsement, MilliSatoshi])((fees, r) => + fees.updatedWith(ReputationRecorder.PeerEndorsement(r.receivedFrom, r.add.endorsement))(fee => + Some(fee.getOrElse(MilliSatoshi(0)) + r.add.amountMsat * totalFee.toLong / upstream.amountIn.toLong))) + reputationRecorder_opt.foreach(_ ! ReputationRecorder.RecordTrampolineSuccess(fees, relayId)) recordRelayDuration(startedAt, isSuccess = true) stopping() case WrappedPaymentFailed(PaymentFailed(_, _, failures, _)) => @@ -294,6 +315,7 @@ class NodeRelay private(nodeParams: NodeParams, if (!fulfilledUpstream) { rejectPayment(upstream, translateError(nodeParams, failures, upstream, nextPayload)) } + reputationRecorder_opt.foreach(_ ! ReputationRecorder.RecordTrampolineFailure(upstream.received.map(r => ReputationRecorder.PeerEndorsement(r.receivedFrom, r.add.endorsement)).toSet, relayId)) recordRelayDuration(startedAt, isSuccess = fulfilledUpstream) stopping() } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelayer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelayer.scala index 20d65b1991..8db24382b4 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelayer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/NodeRelayer.scala @@ -22,6 +22,7 @@ import akka.actor.typed.{ActorRef, Behavior} import fr.acinq.bitcoin.scalacompat.ByteVector32 import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair.payment._ +import fr.acinq.eclair.reputation.ReputationRecorder import fr.acinq.eclair.{Logs, NodeParams} import java.util.UUID @@ -58,7 +59,13 @@ object NodeRelayer { * NB: the payment secret used here is different from the invoice's payment secret and ensures we can * group together HTLCs that the previous trampoline node sent in the same MPP. */ - def apply(nodeParams: NodeParams, register: akka.actor.ActorRef, outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], router: akka.actor.ActorRef, children: Map[PaymentKey, ActorRef[NodeRelay.Command]] = Map.empty): Behavior[Command] = + def apply(nodeParams: NodeParams, + register: akka.actor.ActorRef, + reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.TrampolineRelayCommand]], + outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory, + triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], + router: akka.actor.ActorRef, + children: Map[PaymentKey, ActorRef[NodeRelay.Command]] = Map.empty): Behavior[Command] = Behaviors.setup { context => Behaviors.withMdc(Logs.mdc(category_opt = Some(Logs.LogCategory.PAYMENT)), mdc) { Behaviors.receiveMessage { @@ -73,15 +80,15 @@ object NodeRelayer { case None => val relayId = UUID.randomUUID() context.log.debug(s"spawning a new handler with relayId=$relayId") - val handler = context.spawn(NodeRelay.apply(nodeParams, context.self, register, relayId, nodeRelayPacket, outgoingPaymentFactory, triggerer, router), relayId.toString) + val handler = context.spawn(NodeRelay.apply(nodeParams, context.self, register, reputationRecorder_opt, relayId, nodeRelayPacket, outgoingPaymentFactory, triggerer, router), relayId.toString) context.log.debug("forwarding incoming htlc #{} from channel {} to new handler", htlcIn.id, htlcIn.channelId) handler ! NodeRelay.Relay(nodeRelayPacket, originNode) - apply(nodeParams, register, outgoingPaymentFactory, triggerer, router, children + (childKey -> handler)) + apply(nodeParams, register, reputationRecorder_opt, outgoingPaymentFactory, triggerer, router, children + (childKey -> handler)) } case RelayComplete(childHandler, paymentHash, paymentSecret) => // we do a back-and-forth between parent and child before stopping the child to prevent a race condition childHandler ! NodeRelay.Stop - apply(nodeParams, register, outgoingPaymentFactory, triggerer, router, children - PaymentKey(paymentHash, paymentSecret)) + apply(nodeParams, register, reputationRecorder_opt, outgoingPaymentFactory, triggerer, router, children - PaymentKey(paymentHash, paymentSecret)) case GetPendingPayments(replyTo) => replyTo ! children Behaviors.same diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/Relayer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/Relayer.scala index f9f5c0039b..b0ac3e2cb5 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/Relayer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/Relayer.scala @@ -28,6 +28,7 @@ import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair.channel._ import fr.acinq.eclair.db.PendingCommandsDb import fr.acinq.eclair.payment._ +import fr.acinq.eclair.reputation.{Reputation, ReputationRecorder} import fr.acinq.eclair.wire.protocol._ import fr.acinq.eclair.{CltvExpiryDelta, Logs, MilliSatoshi, NodeParams} import grizzled.slf4j.Logging @@ -49,7 +50,7 @@ import scala.util.Random * It also receives channel HTLC events (fulfill / failed) and relays those to the appropriate handlers. * It also maintains an up-to-date view of local channel balances. */ -class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], initialized: Option[Promise[Done]] = None) extends Actor with DiagnosticActorLogging { +class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.Command]], initialized: Option[Promise[Done]] = None) extends Actor with DiagnosticActorLogging { import Relayer._ @@ -57,8 +58,8 @@ class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paym implicit def implicitLog: LoggingAdapter = log private val postRestartCleaner = context.actorOf(PostRestartHtlcCleaner.props(nodeParams, register, initialized), "post-restart-htlc-cleaner") - private val channelRelayer = context.spawn(Behaviors.supervise(ChannelRelayer(nodeParams, register)).onFailure(SupervisorStrategy.resume), "channel-relayer") - private val nodeRelayer = context.spawn(Behaviors.supervise(NodeRelayer(nodeParams, register, NodeRelay.SimpleOutgoingPaymentFactory(nodeParams, router, register), triggerer, router)).onFailure(SupervisorStrategy.resume), name = "node-relayer") + private val channelRelayer = context.spawn(Behaviors.supervise(ChannelRelayer(nodeParams, register, reputationRecorder_opt)).onFailure(SupervisorStrategy.resume), "channel-relayer") + private val nodeRelayer = context.spawn(Behaviors.supervise(NodeRelayer(nodeParams, register, reputationRecorder_opt, NodeRelay.SimpleOutgoingPaymentFactory(nodeParams, router, register), triggerer, router)).onFailure(SupervisorStrategy.resume), name = "node-relayer") def receive: Receive = { case init: PostRestartHtlcCleaner.Init => postRestartCleaner forward init @@ -120,8 +121,8 @@ class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paym object Relayer extends Logging { - def props(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], initialized: Option[Promise[Done]] = None): Props = - Props(new Relayer(nodeParams, router, register, paymentHandler, triggerer, initialized)) + def props(nodeParams: NodeParams, router: ActorRef, register: ActorRef, paymentHandler: ActorRef, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], reputationRecorder_opt: Option[typed.ActorRef[ReputationRecorder.Command]], initialized: Option[Promise[Done]] = None): Props = + Props(new Relayer(nodeParams, router, register, paymentHandler, triggerer, reputationRecorder_opt, initialized)) // @formatter:off case class RelayFees(feeBase: MilliSatoshi, feeProportionalMillionths: Long) { @@ -135,7 +136,8 @@ object Relayer extends Logging { privateChannelFees: RelayFees, minTrampolineFees: RelayFees, enforcementDelay: FiniteDuration, - asyncPaymentsParams: AsyncPaymentsParams) { + asyncPaymentsParams: AsyncPaymentsParams, + peerReputationConfig: Reputation.Config) { def defaultFees(announceChannel: Boolean): RelayFees = { if (announceChannel) { publicChannelFees diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/reputation/Reputation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/Reputation.scala new file mode 100644 index 0000000000..f74c0aca67 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/Reputation.scala @@ -0,0 +1,95 @@ +/* + * Copyright 2023 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.reputation + +import fr.acinq.eclair.{MilliSatoshi, TimestampMilli} + +import java.util.UUID +import scala.concurrent.duration.FiniteDuration + +/** + * Created by thomash on 21/07/2023. + */ + +/** + * Local reputation for a given incoming node, that should be track for each incoming endorsement level. + * + * @param pastWeight How much fees we would have collected in the past if all payments had succeeded (exponential moving average). + * @param pastScore How much fees we have collected in the past (exponential moving average). + * @param lastSettlementAt Timestamp of the last recorded payment settlement. + * @param pending Set of pending payments (payments may contain multiple HTLCs when using trampoline). + * @param halfLife Half life for the exponential moving average. + * @param maxRelayDuration Duration after which payments are penalized for staying pending too long. + * @param pendingMultiplier How much to penalize pending payments. + */ +case class Reputation(pastWeight: Double, pastScore: Double, lastSettlementAt: TimestampMilli, pending: Map[UUID, Reputation.PendingPayment], halfLife: FiniteDuration, maxRelayDuration: FiniteDuration, pendingMultiplier: Double) { + private def decay(now: TimestampMilli): Double = scala.math.pow(0.5, (now - lastSettlementAt) / halfLife) + + private def pendingWeight(now: TimestampMilli): Double = pending.values.map(_.weight(now, maxRelayDuration, pendingMultiplier)).sum + + /** + * Register a payment to relay and estimate the confidence that it will succeed. + * + * @return (updated reputation, confidence) + */ + def attempt(relayId: UUID, fee: MilliSatoshi, now: TimestampMilli = TimestampMilli.now()): (Reputation, Double) = { + val d = decay(now) + val newReputation = copy(pending = pending + (relayId -> Reputation.PendingPayment(fee, now))) + val confidence = d * pastScore / (d * pastWeight + newReputation.pendingWeight(now)) + (newReputation, confidence) + } + + /** + * Mark a previously registered payment as failed without trying to relay it (usually because its confidence was too low). + * + * @return updated reputation + */ + def cancel(relayId: UUID): Reputation = copy(pending = pending - relayId) + + /** + * When a payment is settled, we record whether it succeeded and how long it took. + * + * @param feeOverride When relaying trampoline payments, the actual fee is only known when the payment succeeds. This + * is used instead of the fee upper bound that was known when first attempting the relay. + * @return updated reputation + */ + def record(relayId: UUID, isSuccess: Boolean, feeOverride: Option[MilliSatoshi] = None, now: TimestampMilli = TimestampMilli.now()): Reputation = { + pending.get(relayId) match { + case Some(p) => + val d = decay(now) + val p1 = p.copy(fee = feeOverride.getOrElse(p.fee)) + val newWeight = d * pastWeight + p1.weight(now, maxRelayDuration, 1.0) + val newScore = d * pastScore + (if (isSuccess) p1.fee.toLong.toDouble else 0) + Reputation(newWeight, newScore, now, pending - relayId, halfLife, maxRelayDuration, pendingMultiplier) + case None => this + } + } +} + +object Reputation { + /** We're relaying that payment and are waiting for it to settle. */ + case class PendingPayment(fee: MilliSatoshi, startedAt: TimestampMilli) { + def weight(now: TimestampMilli, minDuration: FiniteDuration, multiplier: Double): Double = { + val duration = now - startedAt + fee.toLong.toDouble * (duration / minDuration).max(multiplier) + } + } + + case class Config(enabled: Boolean, halfLife: FiniteDuration, maxRelayDuration: FiniteDuration, pendingMultiplier: Double) + + def init(config: Config): Reputation = Reputation(0.0, 0.0, TimestampMilli.min, Map.empty, config.halfLife, config.maxRelayDuration, config.pendingMultiplier) +} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala new file mode 100644 index 0000000000..77982ebfbe --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala @@ -0,0 +1,90 @@ +/* + * Copyright 2023 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.reputation + +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.{ActorRef, Behavior} +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.eclair.MilliSatoshi + +import java.util.UUID + +/** + * Created by thomash on 21/07/2023. + */ + +object ReputationRecorder { + // @formatter:off + sealed trait Command + + sealed trait ChannelRelayCommand extends Command + case class GetConfidence(replyTo: ActorRef[Confidence], originNode: PublicKey, endorsement: Int, relayId: UUID, fee: MilliSatoshi) extends ChannelRelayCommand + case class CancelRelay(originNode: PublicKey, endorsement: Int, relayId: UUID) extends ChannelRelayCommand + case class RecordResult(originNode: PublicKey, endorsement: Int, relayId: UUID, isSuccess: Boolean) extends ChannelRelayCommand + + sealed trait TrampolineRelayCommand extends Command + case class GetTrampolineConfidence(replyTo: ActorRef[Confidence], fees: Map[PeerEndorsement, MilliSatoshi], relayId: UUID) extends TrampolineRelayCommand + case class RecordTrampolineFailure(upstream: Set[PeerEndorsement], relayId: UUID) extends TrampolineRelayCommand + case class RecordTrampolineSuccess(fees: Map[PeerEndorsement, MilliSatoshi], relayId: UUID) extends TrampolineRelayCommand + // @formatter:on + + /** + * @param nodeId nodeId of the upstream peer. + * @param endorsement endorsement value set by the upstream peer in the HTLC we received. + */ + case class PeerEndorsement(nodeId: PublicKey, endorsement: Int) + + /** Confidence that the outgoing HTLC will succeed. */ + case class Confidence(value: Double) + + def apply(reputationConfig: Reputation.Config, reputations: Map[PeerEndorsement, Reputation]): Behavior[Command] = { + Behaviors.receiveMessage { + case GetConfidence(replyTo, originNode, endorsement, relayId, fee) => + val (updatedReputation, confidence) = reputations.getOrElse(PeerEndorsement(originNode, endorsement), Reputation.init(reputationConfig)).attempt(relayId, fee) + replyTo ! Confidence(confidence) + ReputationRecorder(reputationConfig, reputations.updated(PeerEndorsement(originNode, endorsement), updatedReputation)) + case CancelRelay(originNode, endorsement, relayId) => + val updatedReputation = reputations.getOrElse(PeerEndorsement(originNode, endorsement), Reputation.init(reputationConfig)).cancel(relayId) + ReputationRecorder(reputationConfig, reputations.updated(PeerEndorsement(originNode, endorsement), updatedReputation)) + case RecordResult(originNode, endorsement, relayId, isSuccess) => + val updatedReputation = reputations.getOrElse(PeerEndorsement(originNode, endorsement), Reputation.init(reputationConfig)).record(relayId, isSuccess) + ReputationRecorder(reputationConfig, reputations.updated(PeerEndorsement(originNode, endorsement), updatedReputation)) + case GetTrampolineConfidence(replyTo, fees, relayId) => + val (confidence, updatedReputations) = fees.foldLeft((1.0, reputations)) { + case ((c, r), (peerEndorsement, fee)) => + val (updatedReputation, confidence) = reputations.getOrElse(peerEndorsement, Reputation.init(reputationConfig)).attempt(relayId, fee) + (c.min(confidence), r.updated(peerEndorsement, updatedReputation)) + } + replyTo ! Confidence(confidence) + ReputationRecorder(reputationConfig, updatedReputations) + case RecordTrampolineFailure(keys, relayId) => + val updatedReputations = keys.foldLeft(reputations) { + case (r, peerEndorsement) => + val updatedReputation = reputations.getOrElse(peerEndorsement, Reputation.init(reputationConfig)).record(relayId, isSuccess = false) + r.updated(peerEndorsement, updatedReputation) + } + ReputationRecorder(reputationConfig, updatedReputations) + case RecordTrampolineSuccess(fees, relayId) => + val updatedReputations = fees.foldLeft(reputations) { + case (r, (peerEndorsement, fee)) => + val updatedReputation = reputations.getOrElse(peerEndorsement, Reputation.init(reputationConfig)).record(relayId, isSuccess = true, Some(fee)) + r.updated(peerEndorsement, updatedReputation) + } + ReputationRecorder(reputationConfig, updatedReputations) + } + } +} diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 0b0483b437..2ea188fb76 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -29,6 +29,7 @@ import fr.acinq.eclair.io.MessageRelay.RelayAll import fr.acinq.eclair.io.{OpenChannelInterceptor, PeerConnection} import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams} +import fr.acinq.eclair.reputation.Reputation import fr.acinq.eclair.router.Graph.{MessagePath, WeightRatios} import fr.acinq.eclair.router.PathFindingExperimentConf import fr.acinq.eclair.router.Router._ @@ -165,7 +166,9 @@ object TestConstants { feeBase = 548000 msat, feeProportionalMillionths = 30), enforcementDelay = 10 minutes, - asyncPaymentsParams = AsyncPaymentsParams(1008, CltvExpiryDelta(144))), + asyncPaymentsParams = AsyncPaymentsParams(1008, CltvExpiryDelta(144)), + peerReputationConfig = Reputation.Config(enabled = false, 1 day, 10 seconds, 100), + ), db = TestDatabases.inMemoryDb(), autoReconnect = false, initialRandomReconnectDelay = 5 seconds, @@ -231,7 +234,7 @@ object TestConstants { maxAttempts = 2, ), purgeInvoicesInterval = None, - revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config(10, 100 millis) + revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config(10, 100 millis), ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( @@ -335,7 +338,9 @@ object TestConstants { feeBase = 548000 msat, feeProportionalMillionths = 30), enforcementDelay = 10 minutes, - asyncPaymentsParams = AsyncPaymentsParams(1008, CltvExpiryDelta(144))), + asyncPaymentsParams = AsyncPaymentsParams(1008, CltvExpiryDelta(144)), + peerReputationConfig = Reputation.Config(enabled = false, 2 day, 20 seconds, 200), + ), db = TestDatabases.inMemoryDb(), autoReconnect = false, initialRandomReconnectDelay = 5 seconds, @@ -401,7 +406,7 @@ object TestConstants { maxAttempts = 2, ), purgeInvoicesInterval = None, - revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config(10, 100 millis) + revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config(10, 100 millis), ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/FuzzySpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/FuzzySpec.scala index dc25ecedc6..51c6f1b193 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/FuzzySpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/FuzzySpec.scala @@ -66,8 +66,8 @@ class FuzzySpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Channe val bobRegister = system.actorOf(Props(new TestRegister())) val alicePaymentHandler = system.actorOf(Props(new PaymentHandler(aliceParams, aliceRegister, TestProbe().ref))) val bobPaymentHandler = system.actorOf(Props(new PaymentHandler(bobParams, bobRegister, TestProbe().ref))) - val aliceRelayer = system.actorOf(Relayer.props(aliceParams, TestProbe().ref, aliceRegister, alicePaymentHandler, TestProbe().ref)) - val bobRelayer = system.actorOf(Relayer.props(bobParams, TestProbe().ref, bobRegister, bobPaymentHandler, TestProbe().ref)) + val aliceRelayer = system.actorOf(Relayer.props(aliceParams, TestProbe().ref, aliceRegister, alicePaymentHandler, TestProbe().ref, None)) + val bobRelayer = system.actorOf(Relayer.props(bobParams, TestProbe().ref, bobRegister, bobPaymentHandler, TestProbe().ref, None)) val wallet = new DummyOnChainWallet() val alice: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(aliceParams, wallet, bobParams.nodeId, alice2blockchain.ref, aliceRelayer, FakeTxPublisherFactory(alice2blockchain)), alicePeer.ref) val bob: TestFSMRef[ChannelState, ChannelData, Channel] = TestFSMRef(new Channel(bobParams, wallet, aliceParams.nodeId, bob2blockchain.ref, bobRelayer, FakeTxPublisherFactory(bob2blockchain)), bobPeer.ref) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala b/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala index 1fcbada4a6..f9f0821b67 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala @@ -28,6 +28,7 @@ import fr.acinq.eclair.payment.offer.OfferManager import fr.acinq.eclair.payment.receive.{MultiPartHandler, PaymentHandler} import fr.acinq.eclair.payment.relay.{ChannelRelayer, PostRestartHtlcCleaner, Relayer} import fr.acinq.eclair.payment.send.PaymentInitiator +import fr.acinq.eclair.reputation.ReputationRecorder import fr.acinq.eclair.router.Router import fr.acinq.eclair.wire.protocol.IPAddress import fr.acinq.eclair.{BlockHeight, MilliSatoshi, NodeParams, SubscriptionsComplete, TestBitcoinCoreClient, TestDatabases} @@ -96,7 +97,7 @@ object MinimalNodeFixture extends Assertions with Eventually with IntegrationPat val router = system.actorOf(Router.props(nodeParams, watcherTyped), "router") val offerManager = system.spawn(OfferManager(nodeParams, router, 1 minute), "offer-manager") val paymentHandler = system.actorOf(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler") - val relayer = system.actorOf(Relayer.props(nodeParams, router, register, paymentHandler, triggerer.ref.toTyped), "relayer") + val relayer = system.actorOf(Relayer.props(nodeParams, router, register, paymentHandler, triggerer.ref.toTyped, None), "relayer") val txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, watcherTyped, bitcoinClient) val channelFactory = Peer.SimpleChannelFactory(nodeParams, watcherTyped, relayer, wallet, txPublisherFactory) val pendingChannelsRateLimiter = system.spawnAnonymous(Behaviors.supervise(PendingChannelsRateLimiter(nodeParams, router.toTyped, Seq())).onFailure(typed.SupervisorStrategy.resume)) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/PostRestartHtlcCleanerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/PostRestartHtlcCleanerSpec.scala index cd45aa1c7e..7e7333a4a7 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/PostRestartHtlcCleanerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/PostRestartHtlcCleanerSpec.scala @@ -57,7 +57,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit case class FixtureParam(nodeParams: NodeParams, register: TestProbe, sender: TestProbe, eventListener: TestProbe) { def createRelayer(nodeParams1: NodeParams): (ActorRef, ActorRef) = { - val relayer = system.actorOf(Relayer.props(nodeParams1, TestProbe().ref, register.ref, TestProbe().ref, TestProbe().ref.toTyped)) + val relayer = system.actorOf(Relayer.props(nodeParams1, TestProbe().ref, register.ref, TestProbe().ref, TestProbe().ref.toTyped, None)) // we need ensure the post-htlc-restart child actor is initialized sender.send(relayer, Relayer.GetChildActors(sender.ref)) (relayer, sender.expectMsgType[Relayer.ChildActors].postRestartCleaner) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala index a25391031c..2ee7d8979f 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/ChannelRelayerSpec.scala @@ -32,6 +32,7 @@ import fr.acinq.eclair.crypto.Sphinx import fr.acinq.eclair.payment.IncomingPaymentPacket.ChannelRelayPacket import fr.acinq.eclair.payment.relay.ChannelRelayer._ import fr.acinq.eclair.payment.{ChannelPaymentRelayed, IncomingPaymentPacket, PaymentPacketSpec} +import fr.acinq.eclair.reputation.ReputationRecorder import fr.acinq.eclair.router.Announcements import fr.acinq.eclair.wire.protocol.BlindedRouteData.PaymentRelayData import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload @@ -47,24 +48,26 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a import ChannelRelayerSpec._ - case class FixtureParam(nodeParams: NodeParams, channelRelayer: typed.ActorRef[ChannelRelayer.Command], register: TestProbe[Any]) + case class FixtureParam(nodeParams: NodeParams, channelRelayer: typed.ActorRef[ChannelRelayer.Command], register: TestProbe[Any], reputationRecorder: TestProbe[ReputationRecorder.Command]) override def withFixture(test: OneArgTest): Outcome = { // we are node B in the route A -> B -> C -> .... val nodeParams = TestConstants.Bob.nodeParams val register = TestProbe[Any]("register") - val channelRelayer = testKit.spawn(ChannelRelayer.apply(nodeParams, register.ref.toClassic)) + val reputationRecorder = TestProbe[ReputationRecorder.Command]("reputation-recorder") + val channelRelayer = testKit.spawn(ChannelRelayer.apply(nodeParams, register.ref.toClassic, Some(reputationRecorder.ref))) try { - withFixture(test.toNoArgTest(FixtureParam(nodeParams, channelRelayer, register))) + withFixture(test.toNoArgTest(FixtureParam(nodeParams, channelRelayer, register, reputationRecorder))) } finally { testKit.stop(channelRelayer) } } - def expectFwdFail(register: TestProbe[Any], channelId: ByteVector32, cmd: channel.Command): Register.Forward[channel.Command] = { + def expectFwdFail(register: TestProbe[Any], channelId: ByteVector32, cmd: channel.Command, reputationRecorder: TestProbe[ReputationRecorder.Command]): Register.Forward[channel.Command] = { val fwd = register.expectMessageType[Register.Forward[channel.Command]] assert(fwd.message == cmd) assert(fwd.channelId == channelId) + reputationRecorder.expectMessageType[ReputationRecorder.CancelRelay] fwd } @@ -79,6 +82,14 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a fwd } + def setConfidence(f: FixtureParam)(value: Double): Unit = { + import f._ + + val getConfidence = reputationRecorder.expectMessageType[ReputationRecorder.GetConfidence] + assert(getConfidence.originNode == TestConstants.Alice.nodeParams.nodeId) + getConfidence.replyTo ! ReputationRecorder.Confidence(value) + } + def basicRelayTest(f: FixtureParam)(relayPayloadScid: ShortChannelId, lcu: LocalChannelUpdate, success: Boolean): Unit = { import f._ @@ -87,11 +98,12 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(lcu) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) if (success) { expectFwdAdd(register, lcu.channelId, outgoingAmount, outgoingExpiry, 7) } else { - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(UnknownNextPeer()), commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(UnknownNextPeer()), commit = true), reputationRecorder) } } @@ -136,6 +148,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val r1 = createValidIncomingPacket(payload1) channelRelayer ! WrappedLocalChannelUpdate(lcu1) channelRelayer ! Relay(r1, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) expectFwdAdd(register, lcu1.channelId, outgoingAmount, outgoingExpiry, 7) // reorg happens @@ -147,9 +160,11 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a // both old and new real scids work channelRelayer ! Relay(r1, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) expectFwdAdd(register, lcu1.channelId, outgoingAmount, outgoingExpiry, 7) // new real scid works channelRelayer ! Relay(r2, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) expectFwdAdd(register, lcu2.channelId, outgoingAmount, outgoingExpiry, 7) } @@ -162,6 +177,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 7) } @@ -181,6 +197,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u2) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) // first try val fwd1 = expectFwdAdd(register, channelIds(realScid2), outgoingAmount, outgoingExpiry, 7) @@ -193,7 +210,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a fwd1.message.replyTo ! RES_ADD_FAILED(fwd2.message, HtlcValueTooHighInFlight(channelIds(realScid1), 1000000000 msat, 1516977616 msat), Some(u1.channelUpdate)) // the relayer should give up - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(TemporaryChannelFailure(Some(u1.channelUpdate))), commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(TemporaryChannelFailure(Some(u1.channelUpdate))), commit = true), reputationRecorder) } test("fail to relay when we have no channel_update for the next channel") { f => @@ -203,8 +220,9 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val r = createValidIncomingPacket(payload) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(UnknownNextPeer()), commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(UnknownNextPeer()), commit = true), reputationRecorder) } test("fail to relay when register returns an error") { f => @@ -216,11 +234,12 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) val fwd = expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 7) fwd.replyTo ! Register.ForwardFailure(fwd) - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(UnknownNextPeer()), commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(UnknownNextPeer()), commit = true), reputationRecorder) } test("fail to relay when the channel is advertised as unusable (down)") { f => @@ -234,8 +253,9 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! WrappedLocalChannelDown(d) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(UnknownNextPeer()), commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(UnknownNextPeer()), commit = true), reputationRecorder) } test("fail to relay when channel is disabled") { f => @@ -247,8 +267,9 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(ChannelDisabled(u.channelUpdate.messageFlags, u.channelUpdate.channelFlags, Some(u.channelUpdate))), commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(ChannelDisabled(u.channelUpdate.messageFlags, u.channelUpdate.channelFlags, Some(u.channelUpdate))), commit = true), reputationRecorder) } test("fail to relay when amount is below minimum") { f => @@ -260,8 +281,9 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(AmountBelowMinimum(outgoingAmount, Some(u.channelUpdate))), commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(AmountBelowMinimum(outgoingAmount, Some(u.channelUpdate))), commit = true), reputationRecorder) } test("fail to relay blinded payment") { f => @@ -274,6 +296,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) val cmd = register.expectMessageType[Register.Forward[channel.Command]] assert(cmd.channelId == r.add.channelId) @@ -290,6 +313,8 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a assert(fail.onionHash == Sphinx.hash(r.add.onionRoutingPacket)) assert(fail.failureCode == InvalidOnionBlinding(Sphinx.hash(r.add.onionRoutingPacket)).code) } + + reputationRecorder.expectMessageType[ReputationRecorder.CancelRelay] } } @@ -302,6 +327,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) expectFwdAdd(register, channelIds(realScid1), r.amountToForward, r.outgoingCltv, 7).message } @@ -315,8 +341,9 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(IncorrectCltvExpiry(r.outgoingCltv, Some(u.channelUpdate))), commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(IncorrectCltvExpiry(r.outgoingCltv, Some(u.channelUpdate))), commit = true), reputationRecorder) } test("fail to relay when fee is insufficient") { f => @@ -328,8 +355,9 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(FeeInsufficient(r.add.amountMsat, Some(u.channelUpdate))), commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(FeeInsufficient(r.add.amountMsat, Some(u.channelUpdate))), commit = true), reputationRecorder) } test("relay that would fail (fee insufficient) with a recent channel update but succeed with the previous update") { f => @@ -341,6 +369,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u1) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) // relay succeeds with current channel update (u1) with lower fees expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 7) @@ -349,6 +378,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u2) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) // relay succeeds because the current update (u2) with higher fees occurred less than 10 minutes ago expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 7) @@ -358,9 +388,10 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a channelRelayer ! WrappedLocalChannelUpdate(u1) channelRelayer ! WrappedLocalChannelUpdate(u3) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) // relay fails because the current update (u3) with higher fees occurred more than 10 minutes ago - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(FeeInsufficient(r.add.amountMsat, Some(u3.channelUpdate))), commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(FeeInsufficient(r.add.amountMsat, Some(u3.channelUpdate))), commit = true), reputationRecorder) } test("fail to relay when there is a local error") { f => @@ -387,9 +418,10 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a testCases.foreach { testCase => channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) val fwd = expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 7) fwd.message.replyTo ! RES_ADD_FAILED(fwd.message, testCase.exc, Some(testCase.update)) - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(testCase.failure), commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(testCase.failure), commit = true), reputationRecorder) } } @@ -422,6 +454,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val payload = ChannelRelay.Standard(ShortChannelId(12345), 998900 msat, CltvExpiry(60)) val r = createValidIncomingPacket(payload, 1000000 msat, CltvExpiry(70), endorsementIn = 5) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(5.5 / 8) // select the channel to the same node, with the lowest capacity and balance but still high enough to handle the payment val cmd1 = expectFwdAdd(register, channelUpdates(ShortChannelId(22223)).channelId, r.amountToForward, r.outgoingCltv, 5).message cmd1.replyTo ! RES_ADD_FAILED(cmd1, ChannelUnavailable(randomBytes32()), None) @@ -435,13 +468,14 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val cmd4 = expectFwdAdd(register, channelUpdates(ShortChannelId(11111)).channelId, r.amountToForward, r.outgoingCltv, 5).message cmd4.replyTo ! RES_ADD_FAILED(cmd4, HtlcValueTooHighInFlight(randomBytes32(), 100000000 msat, 100000000 msat), Some(channelUpdates(ShortChannelId(11111)).channelUpdate)) // all the suitable channels have been tried - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(TemporaryChannelFailure(Some(channelUpdates(ShortChannelId(12345)).channelUpdate))), commit = true)) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(TemporaryChannelFailure(Some(channelUpdates(ShortChannelId(12345)).channelUpdate))), commit = true), reputationRecorder) } { // higher amount payment (have to increased incoming htlc amount for fees to be sufficient) val payload = ChannelRelay.Standard(ShortChannelId(12345), 50000000 msat, CltvExpiry(60)) val r = createValidIncomingPacket(payload, 60000000 msat, CltvExpiry(70), endorsementIn = 0) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(0) expectFwdAdd(register, channelUpdates(ShortChannelId(11111)).channelId, r.amountToForward, r.outgoingCltv, 0).message } { @@ -449,6 +483,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val payload = ChannelRelay.Standard(ShortChannelId(12345), 1000 msat, CltvExpiry(60)) val r = createValidIncomingPacket(payload, 60000000 msat, CltvExpiry(70), endorsementIn = 6) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(6.5 / 8) expectFwdAdd(register, channelUpdates(ShortChannelId(33333)).channelId, r.amountToForward, r.outgoingCltv, 6).message } { @@ -456,6 +491,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val payload = ChannelRelay.Standard(ShortChannelId(12345), 1000000000 msat, CltvExpiry(60)) val r = createValidIncomingPacket(payload, 1010000000 msat, CltvExpiry(70)) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(7.5 / 8) expectFwdAdd(register, channelUpdates(ShortChannelId(12345)).channelId, r.amountToForward, r.outgoingCltv, 7).message } { @@ -463,6 +499,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val payload = ChannelRelay.Standard(ShortChannelId(12345), 998900 msat, CltvExpiry(50)) val r = createValidIncomingPacket(payload, 1000000 msat, CltvExpiry(70)) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(7.5 / 8) expectFwdAdd(register, channelUpdates(ShortChannelId(22223)).channelId, r.amountToForward, r.outgoingCltv, 7).message } { @@ -470,7 +507,8 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val payload = ChannelRelay.Standard(ShortChannelId(12345), 998900 msat, CltvExpiry(61)) val r = createValidIncomingPacket(payload, 1000000 msat, CltvExpiry(70)) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) - expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(IncorrectCltvExpiry(CltvExpiry(61), Some(channelUpdates(ShortChannelId(12345)).channelUpdate))), commit = true)) + setConfidence(f)(0.2) + expectFwdFail(register, r.add.channelId, CMD_FAIL_HTLC(r.add.id, Right(IncorrectCltvExpiry(CltvExpiry(61), Some(channelUpdates(ShortChannelId(12345)).channelUpdate))), commit = true), reputationRecorder) } } @@ -496,10 +534,14 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a testCases.foreach { testCase => channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(1.0) val fwd = expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 7) fwd.message.replyTo ! RES_SUCCESS(fwd.message, channelId1) fwd.message.origin.replyTo ! RES_ADD_SETTLED(fwd.message.origin, downstream_htlc, testCase.result) - expectFwdFail(register, r.add.channelId, testCase.cmd) + val fwdFail = register.expectMessageType[Register.Forward[channel.Command]] + assert(fwdFail.message == testCase.cmd) + assert(fwdFail.channelId == r.add.channelId) + assert(!reputationRecorder.expectMessageType[ReputationRecorder.RecordResult].isSuccess) } } @@ -522,6 +564,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val r = createValidIncomingPacket(createBlindedPayload(u.channelUpdate, isIntroduction), outgoingAmount + u.channelUpdate.feeBaseMsat, outgoingExpiry + u.channelUpdate.cltvExpiryDelta, endorsementIn = 0) channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(0) val fwd = expectFwdAdd(register, channelId1, outgoingAmount, outgoingExpiry, 0) fwd.message.replyTo ! RES_SUCCESS(fwd.message, channelId1) fwd.message.origin.replyTo ! RES_ADD_SETTLED(fwd.message.origin, downstream, htlcResult) @@ -540,6 +583,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a assert(fail.onionHash == Sphinx.hash(r.add.onionRoutingPacket)) assert(fail.failureCode == InvalidOnionBlinding(Sphinx.hash(r.add.onionRoutingPacket)).code) } + assert(!reputationRecorder.expectMessageType[ReputationRecorder.RecordResult].isSuccess) } } } @@ -565,6 +609,7 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a testCases.foreach { testCase => channelRelayer ! WrappedLocalChannelUpdate(u) channelRelayer ! Relay(r, TestConstants.Alice.nodeParams.nodeId) + setConfidence(f)(3.5 / 8) val fwd1 = expectFwdAdd(register, channelIds(realScid1), outgoingAmount, outgoingExpiry, 3) fwd1.message.replyTo ! RES_SUCCESS(fwd1.message, channelId1) @@ -577,6 +622,8 @@ class ChannelRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("a val paymentRelayed = eventListener.expectMessageType[ChannelPaymentRelayed] assert(paymentRelayed.copy(startedAt = 0 unixms, settledAt = 0 unixms) == ChannelPaymentRelayed(r.add.amountMsat, r.amountToForward, r.add.paymentHash, r.add.channelId, channelId1, startedAt = 0 unixms, settledAt = 0 unixms)) + + assert(reputationRecorder.expectMessageType[ReputationRecorder.RecordResult].isSuccess) } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/NodeRelayerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/NodeRelayerSpec.scala index 64b270f4cb..8ee310ab6d 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/NodeRelayerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/NodeRelayerSpec.scala @@ -41,6 +41,8 @@ import fr.acinq.eclair.payment.send.MultiPartPaymentLifecycle.{PreimageReceived, import fr.acinq.eclair.payment.send.PaymentInitiator.SendPaymentConfig import fr.acinq.eclair.payment.send.PaymentLifecycle.SendPaymentToNode import fr.acinq.eclair.payment.send.{BlindedRecipient, ClearRecipient} +import fr.acinq.eclair.reputation.ReputationRecorder +import fr.acinq.eclair.reputation.ReputationRecorder.{Confidence, GetTrampolineConfidence, RecordTrampolineFailure, RecordTrampolineSuccess} import fr.acinq.eclair.router.Router.RouteRequest import fr.acinq.eclair.router.{BalanceTooLow, RouteNotFound, Router} import fr.acinq.eclair.wire.protocol.OfferTypes._ @@ -65,11 +67,11 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl import NodeRelayerSpec._ - case class FixtureParam(nodeParams: NodeParams, router: TestProbe[Any], register: TestProbe[Any], mockPayFSM: TestProbe[Any], eventListener: TestProbe[PaymentEvent], triggerer: TestProbe[AsyncPaymentTriggerer.Command]) { + case class FixtureParam(nodeParams: NodeParams, router: TestProbe[Any], register: TestProbe[Any], reputationRecorder: TestProbe[ReputationRecorder.Command], mockPayFSM: TestProbe[Any], eventListener: TestProbe[PaymentEvent], triggerer: TestProbe[AsyncPaymentTriggerer.Command]) { def createNodeRelay(packetIn: IncomingPaymentPacket.NodeRelayPacket, useRealPaymentFactory: Boolean = false): (ActorRef[NodeRelay.Command], TestProbe[NodeRelayer.Command]) = { val parent = TestProbe[NodeRelayer.Command]("parent-relayer") val outgoingPaymentFactory = if (useRealPaymentFactory) RealOutgoingPaymentFactory(this) else FakeOutgoingPaymentFactory(this) - val nodeRelay = testKit.spawn(NodeRelay(nodeParams, parent.ref, register.ref.toClassic, relayId, packetIn, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic)) + val nodeRelay = testKit.spawn(NodeRelay(nodeParams, parent.ref, register.ref.toClassic, Some(reputationRecorder.ref), relayId, packetIn, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic)) (nodeRelay, parent) } } @@ -96,17 +98,18 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl .modify(_.relayParams.asyncPaymentsParams.holdTimeoutBlocks).setToIf(test.tags.contains("long_hold_timeout"))(200000) // timeout after payment expires val router = TestProbe[Any]("router") val register = TestProbe[Any]("register") + val reputationRecorder = TestProbe[ReputationRecorder.Command]("reputation-recorder") val eventListener = TestProbe[PaymentEvent]("event-listener") system.eventStream ! EventStream.Subscribe(eventListener.ref) val mockPayFSM = TestProbe[Any]("pay-fsm") val triggerer = TestProbe[AsyncPaymentTriggerer.Command]("payment-triggerer") - withFixture(test.toNoArgTest(FixtureParam(nodeParams, router, register, mockPayFSM, eventListener, triggerer))) + withFixture(test.toNoArgTest(FixtureParam(nodeParams, router, register, reputationRecorder, mockPayFSM, eventListener, triggerer))) } test("create child handlers for new payments") { f => import f._ val probe = TestProbe[Any]() - val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, FakeOutgoingPaymentFactory(f), triggerer.ref, router.ref.toClassic)) + val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, Some(reputationRecorder.ref), FakeOutgoingPaymentFactory(f), triggerer.ref, router.ref.toClassic)) parentRelayer ! NodeRelayer.GetPendingPayments(probe.ref.toClassic) probe.expectMessage(Map.empty) @@ -145,7 +148,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val outgoingPaymentFactory = FakeOutgoingPaymentFactory(f) { - val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic)) + val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, Some(reputationRecorder.ref), outgoingPaymentFactory, triggerer.ref, router.ref.toClassic)) parentRelayer ! NodeRelayer.GetPendingPayments(probe.ref.toClassic) probe.expectMessage(Map.empty) } @@ -153,7 +156,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val (paymentHash1, paymentSecret1, child1) = (randomBytes32(), randomBytes32(), TestProbe[NodeRelay.Command]()) val (paymentHash2, paymentSecret2, child2) = (randomBytes32(), randomBytes32(), TestProbe[NodeRelay.Command]()) val children = Map(PaymentKey(paymentHash1, paymentSecret1) -> child1.ref, PaymentKey(paymentHash2, paymentSecret2) -> child2.ref) - val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic, children)) + val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, Some(reputationRecorder.ref), outgoingPaymentFactory, triggerer.ref, router.ref.toClassic, children)) parentRelayer ! NodeRelayer.GetPendingPayments(probe.ref.toClassic) probe.expectMessage(children) @@ -169,7 +172,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val (paymentSecret1, child1) = (randomBytes32(), TestProbe[NodeRelay.Command]()) val (paymentSecret2, child2) = (randomBytes32(), TestProbe[NodeRelay.Command]()) val children = Map(PaymentKey(paymentHash, paymentSecret1) -> child1.ref, PaymentKey(paymentHash, paymentSecret2) -> child2.ref) - val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic, children)) + val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, Some(reputationRecorder.ref), outgoingPaymentFactory, triggerer.ref, router.ref.toClassic, children)) parentRelayer ! NodeRelayer.GetPendingPayments(probe.ref.toClassic) probe.expectMessage(children) @@ -179,7 +182,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl probe.expectMessage(Map(PaymentKey(paymentHash, paymentSecret2) -> child2.ref)) } { - val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, outgoingPaymentFactory, triggerer.ref, router.ref.toClassic)) + val parentRelayer = testKit.spawn(NodeRelayer(nodeParams, register.ref.toClassic, Some(reputationRecorder.ref), outgoingPaymentFactory, triggerer.ref, router.ref.toClassic)) parentRelayer ! NodeRelayer.Relay(incomingMultiPart.head, randomKey().publicKey) parentRelayer ! NodeRelayer.GetPendingPayments(probe.ref.toClassic) val pending1 = probe.expectMessageType[Map[PaymentKey, ActorRef[NodeRelay.Command]]] @@ -231,6 +234,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl nextTrampolinePacket) nodeRelayer ! NodeRelay.Relay(extra, randomKey().publicKey) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0) + // the extra payment will be rejected val fwd = register.expectMessageType[Register.Forward[CMD_FAIL_HTLC]] assert(fwd.channelId == extra.add.channelId) @@ -247,8 +252,10 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl // Receive a complete upstream multi-part payment, which we relay out. incomingMultiPart.foreach(incoming => nodeRelayer ! NodeRelay.Relay(incoming, randomKey().publicKey)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(3.5 / 8) + val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] - validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 5) + validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 3) val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment] validateOutgoingPayment(outgoingPayment) @@ -375,9 +382,11 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl assert(peerWatch.timeout == asyncSafetyHeight(incomingAsyncPayment, nodeParams)) peerWatch.replyTo ! AsyncPaymentTriggered + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(4.5 / 8) + // upstream payment relayed val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] - validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingAsyncPayment.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 5) + validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingAsyncPayment.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 4) val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment] validateOutgoingPayment(outgoingPayment) // those are adapters for pay-fsm messages @@ -398,6 +407,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl assert(relayEvent.incoming.map(p => (p.amount, p.channelId)).toSet == incomingAsyncPayment.map(i => (i.add.amountMsat, i.add.channelId)).toSet) assert(relayEvent.outgoing.nonEmpty) parent.expectMessageType[NodeRelayer.RelayComplete] + reputationRecorder.expectMessageType[RecordTrampolineSuccess] register.expectNoMessage(100 millis) } @@ -452,9 +462,11 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val (nodeRelayer, parent) = createNodeRelay(incomingAsyncPayment.head) incomingAsyncPayment.foreach(p => nodeRelayer ! NodeRelay.Relay(p, randomKey().publicKey)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(6.5 / 8) + // upstream payment relayed val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] - validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingAsyncPayment.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 5) + validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingAsyncPayment.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 6) val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment] validateOutgoingPayment(outgoingPayment) // those are adapters for pay-fsm messages @@ -475,6 +487,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl assert(relayEvent.incoming.map(p => (p.amount, p.channelId)).toSet == incomingAsyncPayment.map(i => (i.add.amountMsat, i.add.channelId)).toSet) assert(relayEvent.outgoing.nonEmpty) parent.expectMessageType[NodeRelayer.RelayComplete] + reputationRecorder.expectMessageType[RecordTrampolineSuccess] register.expectNoMessage(100 millis) } @@ -551,6 +564,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val (nodeRelayer, _) = f.createNodeRelay(incomingMultiPart.head) incomingMultiPart.foreach(p => nodeRelayer ! NodeRelay.Relay(p, randomKey().publicKey)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0) + mockPayFSM.expectMessageType[SendPaymentConfig] // those are adapters for pay-fsm messages val nodeRelayerAdapters = mockPayFSM.expectMessageType[SendMultiPartPayment].replyTo @@ -563,6 +578,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl assert(fwd.message == CMD_FAIL_HTLC(p.add.id, Right(TrampolineFeeInsufficient()), commit = true)) } + reputationRecorder.expectMessageType[RecordTrampolineFailure] register.expectNoMessage(100 millis) eventListener.expectNoMessage(100 millis) } @@ -578,6 +594,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val (nodeRelayer, _) = f.createNodeRelay(incoming.head, useRealPaymentFactory = true) incoming.foreach(p => nodeRelayer ! NodeRelay.Relay(p, randomKey().publicKey)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0) + val payFSM = mockPayFSM.expectMessageType[akka.actor.ActorRef] router.expectMessageType[RouteRequest] payFSM ! Status.Failure(BalanceTooLow) @@ -588,6 +606,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl assert(fwd.message == CMD_FAIL_HTLC(p.add.id, Right(TemporaryNodeFailure()), commit = true)) } + reputationRecorder.expectMessageType[RecordTrampolineFailure] register.expectNoMessage(100 millis) } @@ -598,6 +617,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val (nodeRelayer, _) = f.createNodeRelay(incomingMultiPart.head, useRealPaymentFactory = true) incomingMultiPart.foreach(p => nodeRelayer ! NodeRelay.Relay(p, randomKey().publicKey)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0) + val payFSM = mockPayFSM.expectMessageType[akka.actor.ActorRef] router.expectMessageType[RouteRequest] @@ -611,6 +632,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl assert(fwd.message == CMD_FAIL_HTLC(p.add.id, Right(TrampolineFeeInsufficient()), commit = true)) } + reputationRecorder.expectMessageType[RecordTrampolineFailure] register.expectNoMessage(100 millis) } @@ -621,6 +643,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val (nodeRelayer, _) = f.createNodeRelay(incomingMultiPart.head, useRealPaymentFactory = true) incomingMultiPart.foreach(p => nodeRelayer ! NodeRelay.Relay(p, randomKey().publicKey)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0) + val payFSM = mockPayFSM.expectMessageType[akka.actor.ActorRef] router.expectMessageType[RouteRequest] @@ -633,6 +657,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl assert(fwd.message == CMD_FAIL_HTLC(p.add.id, Right(FinalIncorrectHtlcAmount(42 msat)), commit = true)) } + reputationRecorder.expectMessageType[RecordTrampolineFailure] register.expectNoMessage(100 millis) } @@ -643,6 +668,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val (nodeRelayer, _) = f.createNodeRelay(incomingSinglePart, useRealPaymentFactory = true) nodeRelayer ! NodeRelay.Relay(incomingSinglePart, randomKey().publicKey) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0) + val routeRequest = router.expectMessageType[RouteRequest] val routeParams = routeRequest.routeParams assert(routeParams.boundaries.maxFeeProportional == 0) // should be disabled @@ -661,8 +688,10 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl nodeRelayer ! NodeRelay.Relay(incomingMultiPart.last, randomKey().publicKey) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(3.5 / 8) + val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] - validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 5) + validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 3) val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment] validateOutgoingPayment(outgoingPayment) // those are adapters for pay-fsm messages @@ -686,6 +715,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl validateRelayEvent(relayEvent) assert(relayEvent.incoming.map(p => (p.amount, p.channelId)).toSet == incomingMultiPart.map(i => (i.add.amountMsat, i.add.channelId)).toSet) assert(relayEvent.outgoing.nonEmpty) + reputationRecorder.expectMessageType[RecordTrampolineSuccess] parent.expectMessageType[NodeRelayer.RelayComplete] register.expectNoMessage(100 millis) } @@ -697,6 +727,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val (nodeRelayer, parent) = f.createNodeRelay(incomingSinglePart) nodeRelayer ! NodeRelay.Relay(incomingSinglePart, randomKey().publicKey) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0) + val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(Upstream.Hot.Channel(incomingSinglePart.add, TimestampMilli.now(), randomKey().publicKey) :: Nil), 7) val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment] @@ -715,6 +747,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl validateRelayEvent(relayEvent) assert(relayEvent.incoming.map(p => (p.amount, p.channelId)) == Seq((incomingSinglePart.add.amountMsat, incomingSinglePart.add.channelId))) assert(relayEvent.outgoing.nonEmpty) + reputationRecorder.expectMessageType[RecordTrampolineSuccess] parent.expectMessageType[NodeRelayer.RelayComplete] register.expectNoMessage(100 millis) } @@ -732,8 +765,10 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val (nodeRelayer, parent) = f.createNodeRelay(incomingPayments.head) incomingPayments.foreach(incoming => nodeRelayer ! NodeRelay.Relay(incoming, randomKey().publicKey)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(2.5 / 8) + val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] - validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 5) + validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 2) val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment] assert(outgoingPayment.recipient.nodeId == outgoingNodeId) assert(outgoingPayment.recipient.totalAmount == outgoingAmount) @@ -759,6 +794,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl validateRelayEvent(relayEvent) assert(relayEvent.incoming.map(p => (p.amount, p.channelId)) == incomingMultiPart.map(i => (i.add.amountMsat, i.add.channelId))) assert(relayEvent.outgoing.nonEmpty) + reputationRecorder.expectMessageType[RecordTrampolineSuccess] parent.expectMessageType[NodeRelayer.RelayComplete] register.expectNoMessage(100 millis) } @@ -776,8 +812,10 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val (nodeRelayer, parent) = f.createNodeRelay(incomingPayments.head) incomingPayments.foreach(incoming => nodeRelayer ! NodeRelay.Relay(incoming, randomKey().publicKey)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(0) + val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] - validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 5) + validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 0) val outgoingPayment = mockPayFSM.expectMessageType[SendPaymentToNode] assert(outgoingPayment.recipient.nodeId == outgoingNodeId) assert(outgoingPayment.amount == outgoingAmount) @@ -803,6 +841,7 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl validateRelayEvent(relayEvent) assert(relayEvent.incoming.map(p => (p.amount, p.channelId)) == incomingMultiPart.map(i => (i.add.amountMsat, i.add.channelId))) assert(relayEvent.outgoing.length == 1) + reputationRecorder.expectMessageType[RecordTrampolineSuccess] parent.expectMessageType[NodeRelayer.RelayComplete] register.expectNoMessage(100 millis) } @@ -845,8 +884,10 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val (nodeRelayer, parent) = f.createNodeRelay(incomingPayments.head) incomingPayments.foreach(incoming => nodeRelayer ! NodeRelay.Relay(incoming, randomKey().publicKey)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0) + val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] - validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 5, ignoreNodeId = true) + validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 7, ignoreNodeId = true) val outgoingPayment = mockPayFSM.expectMessageType[SendPaymentToNode] assert(outgoingPayment.amount == outgoingAmount) assert(outgoingPayment.recipient.expiry == outgoingExpiry) @@ -884,8 +925,10 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val (nodeRelayer, parent) = f.createNodeRelay(incomingPayments.head) incomingPayments.foreach(incoming => nodeRelayer ! NodeRelay.Relay(incoming, randomKey().publicKey)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.5 / 8) + val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] - validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 5, ignoreNodeId = true) + validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 1, ignoreNodeId = true) val outgoingPayment = mockPayFSM.expectMessageType[SendMultiPartPayment] assert(outgoingPayment.recipient.totalAmount == outgoingAmount) assert(outgoingPayment.recipient.expiry == outgoingExpiry) @@ -926,13 +969,15 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val (nodeRelayer, parent) = f.createNodeRelay(incomingPayments.head) incomingPayments.foreach(incoming => nodeRelayer ! NodeRelay.Relay(incoming, randomKey().publicKey)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(4.5 / 8) + val getNodeId = router.expectMessageType[Router.GetNodeId] assert(getNodeId.isNode1 == scidDir.isNode1) assert(getNodeId.shortChannelId == scidDir.scid) getNodeId.replyTo ! Some(outgoingNodeId) val outgoingCfg = mockPayFSM.expectMessageType[SendPaymentConfig] - validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 5, ignoreNodeId = true) + validateOutgoingCfg(outgoingCfg, Upstream.Hot.Trampoline(incomingMultiPart.map(p => Upstream.Hot.Channel(p.add, TimestampMilli.now(), randomKey().publicKey))), 4, ignoreNodeId = true) val outgoingPayment = mockPayFSM.expectMessageType[SendPaymentToNode] assert(outgoingPayment.amount == outgoingAmount) assert(outgoingPayment.recipient.expiry == outgoingExpiry) @@ -973,6 +1018,8 @@ class NodeRelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("appl val (nodeRelayer, _) = f.createNodeRelay(incomingPayments.head) incomingPayments.foreach(incoming => nodeRelayer ! NodeRelay.Relay(incoming, randomKey().publicKey)) + reputationRecorder.expectMessageType[GetTrampolineConfidence].replyTo ! Confidence(1.0) + val getNodeId = router.expectMessageType[Router.GetNodeId] assert(getNodeId.isNode1 == scidDir.isNode1) assert(getNodeId.shortChannelId == scidDir.scid) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/RelayerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/RelayerSpec.scala index 8e68b899e3..2d5c2aadce 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/RelayerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/relay/RelayerSpec.scala @@ -33,6 +33,7 @@ import fr.acinq.eclair.payment.OutgoingPaymentPacket.{NodePayload, buildOnion, b import fr.acinq.eclair.payment.PaymentPacketSpec._ import fr.acinq.eclair.payment.relay.Relayer._ import fr.acinq.eclair.payment.send.{ClearRecipient, TrampolineRecipient} +import fr.acinq.eclair.reputation.ReputationRecorder import fr.acinq.eclair.router.BaseRouterSpec.{blindedRouteFromHops, channelHopFromUpdate} import fr.acinq.eclair.router.Router.{NodeHop, Route} import fr.acinq.eclair.wire.protocol.PaymentOnion.FinalPayload @@ -46,7 +47,7 @@ import scala.concurrent.duration.DurationInt class RelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with FixtureAnyFunSuiteLike { - case class FixtureParam(nodeParams: NodeParams, relayer: akka.actor.ActorRef, router: TestProbe[Any], register: TestProbe[Any], childActors: ChildActors, paymentHandler: TestProbe[Any]) + case class FixtureParam(nodeParams: NodeParams, relayer: akka.actor.ActorRef, router: TestProbe[Any], register: TestProbe[Any], childActors: ChildActors, paymentHandler: TestProbe[Any], reputationRecorder: TestProbe[ReputationRecorder.Command]) override def withFixture(test: OneArgTest): Outcome = { // we are node B in the route A -> B -> C -> .... @@ -56,17 +57,26 @@ class RelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("applicat val register = TestProbe[Any]("register") val paymentHandler = TestProbe[Any]("payment-handler") val triggerer = TestProbe[AsyncPaymentTriggerer.Command]("payment-triggerer") + val reputationRecorder = TestProbe[ReputationRecorder.Command]("reputation-recorder") val probe = TestProbe[Any]() // we can't spawn top-level actors with akka typed testKit.spawn(Behaviors.setup[Any] { context => - val relayer = context.toClassic.actorOf(Relayer.props(nodeParams, router.ref.toClassic, register.ref.toClassic, paymentHandler.ref.toClassic, triggerer.ref)) + val relayer = context.toClassic.actorOf(Relayer.props(nodeParams, router.ref.toClassic, register.ref.toClassic, paymentHandler.ref.toClassic, triggerer.ref, Some(reputationRecorder.ref))) probe.ref ! relayer Behaviors.empty[Any] }) val relayer = probe.expectMessageType[akka.actor.ActorRef] relayer ! GetChildActors(probe.ref.toClassic) val childActors = probe.expectMessageType[ChildActors] - withFixture(test.toNoArgTest(FixtureParam(nodeParams, relayer, router, register, childActors, paymentHandler))) + withFixture(test.toNoArgTest(FixtureParam(nodeParams, relayer, router, register, childActors, paymentHandler, reputationRecorder))) + } + + def setConfidence(f: FixtureParam)(value: Double): Unit = { + import f._ + + val getConfidence = reputationRecorder.expectMessageType[ReputationRecorder.GetConfidence] + assert(getConfidence.originNode == TestConstants.Alice.nodeParams.nodeId) + getConfidence.replyTo ! ReputationRecorder.Confidence(value) } val channelId_ab = randomBytes32() @@ -94,6 +104,7 @@ class RelayerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("applicat // and then manually build an htlc val add_ab = UpdateAddHtlc(randomBytes32(), 123456, payment.cmd.amount, payment.cmd.paymentHash, payment.cmd.cltvExpiry, payment.cmd.onion, None, 1.0) relayer ! RelayForward(add_ab, priv_a.publicKey) + setConfidence(f)(1.0) register.expectMessageType[Register.Forward[CMD_ADD_HTLC]] } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationRecorderSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationRecorderSpec.scala new file mode 100644 index 0000000000..7228b3372d --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationRecorderSpec.scala @@ -0,0 +1,96 @@ +/* + * Copyright 2023 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.reputation + +import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe} +import akka.actor.typed.ActorRef +import com.typesafe.config.ConfigFactory +import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.eclair.reputation.ReputationRecorder._ +import fr.acinq.eclair.{MilliSatoshiLong, randomKey} +import org.scalatest.Outcome +import org.scalatest.funsuite.FixtureAnyFunSuiteLike + +import java.util.UUID +import scala.concurrent.duration.DurationInt + +class ReputationRecorderSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with FixtureAnyFunSuiteLike { + val (uuid1, uuid2, uuid3, uuid4, uuid5, uuid6, uuid7, uuid8) = (UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID()) + val originNode: PublicKey = randomKey().publicKey + + case class FixtureParam(config: Reputation.Config, reputationRecorder: ActorRef[Command], replyTo: TestProbe[Confidence]) + + override def withFixture(test: OneArgTest): Outcome = { + val config = Reputation.Config(enabled = true, 1 day, 10 seconds, 2) + val replyTo = TestProbe[Confidence]("confidence") + val reputationRecorder = testKit.spawn(ReputationRecorder(config, Map.empty)) + withFixture(test.toNoArgTest(FixtureParam(config, reputationRecorder.ref, replyTo))) + } + + test("channel relay") { f => + import f._ + + reputationRecorder ! GetConfidence(replyTo.ref, originNode, 7, uuid1, 2000 msat) + assert(replyTo.expectMessageType[Confidence].value == 0) + reputationRecorder ! RecordResult(originNode, 7, uuid1, isSuccess = true) + reputationRecorder ! GetConfidence(replyTo.ref, originNode, 7, uuid2, 1000 msat) + assert(replyTo.expectMessageType[Confidence].value === (2.0 / 4) +- 0.001) + reputationRecorder ! GetConfidence(replyTo.ref, originNode, 7, uuid3, 3000 msat) + assert(replyTo.expectMessageType[Confidence].value === (2.0 / 10) +- 0.001) + reputationRecorder ! CancelRelay(originNode, 7, uuid3) + reputationRecorder ! GetConfidence(replyTo.ref, originNode, 7, uuid4, 1000 msat) + assert(replyTo.expectMessageType[Confidence].value === (2.0 / 6) +- 0.001) + reputationRecorder ! RecordResult(originNode, 7, uuid4, isSuccess = true) + reputationRecorder ! RecordResult(originNode, 7, uuid2, isSuccess = false) + // Not endorsed + reputationRecorder ! GetConfidence(replyTo.ref, originNode, 0, uuid5, 1000 msat) + assert(replyTo.expectMessageType[Confidence].value == 0) + // Different origin node + reputationRecorder ! GetConfidence(replyTo.ref, randomKey().publicKey, 7, uuid6, 1000 msat) + assert(replyTo.expectMessageType[Confidence].value == 0) + // Very large HTLC + reputationRecorder ! GetConfidence(replyTo.ref, originNode, 7, uuid5, 100000000 msat) + assert(replyTo.expectMessageType[Confidence].value === 0.0 +- 0.001) + } + + test("trampoline relay") { f => + import f._ + + val (a, b, c) = (randomKey().publicKey, randomKey().publicKey, randomKey().publicKey) + + reputationRecorder ! GetTrampolineConfidence(replyTo.ref, Map(PeerEndorsement(a, 7) -> 2000.msat, PeerEndorsement(b, 7) -> 4000.msat, PeerEndorsement(c, 0) -> 6000.msat), uuid1) + assert(replyTo.expectMessageType[Confidence].value == 0) + reputationRecorder ! RecordTrampolineSuccess(Map(PeerEndorsement(a, 7) -> 1000.msat, PeerEndorsement(b, 7) -> 2000.msat, PeerEndorsement(c, 0) -> 3000.msat), uuid1) + reputationRecorder ! GetTrampolineConfidence(replyTo.ref, Map(PeerEndorsement(a, 7) -> 1000.msat, PeerEndorsement(c, 0) -> 1000.msat), uuid2) + assert(replyTo.expectMessageType[Confidence].value === (1.0 / 3) +- 0.001) + reputationRecorder ! GetTrampolineConfidence(replyTo.ref, Map(PeerEndorsement(a, 0) -> 1000.msat, PeerEndorsement(b, 7) -> 2000.msat), uuid3) + assert(replyTo.expectMessageType[Confidence].value == 0) + reputationRecorder ! RecordTrampolineFailure(Set(PeerEndorsement(a, 7), PeerEndorsement(c, 0)), uuid2) + reputationRecorder ! RecordTrampolineSuccess(Map(PeerEndorsement(a, 0) -> 1000.msat, PeerEndorsement(b, 7) -> 2000.msat), uuid3) + + reputationRecorder ! GetConfidence(replyTo.ref, a, 7, uuid4, 1000 msat) + assert(replyTo.expectMessageType[Confidence].value === (1.0 / 4) +- 0.001) + reputationRecorder ! GetConfidence(replyTo.ref, a, 0, uuid5, 1000 msat) + assert(replyTo.expectMessageType[Confidence].value === (1.0 / 3) +- 0.001) + reputationRecorder ! GetConfidence(replyTo.ref, b, 7, uuid6, 1000 msat) + assert(replyTo.expectMessageType[Confidence].value === (4.0 / 6) +- 0.001) + reputationRecorder ! GetConfidence(replyTo.ref, b, 0, uuid7, 1000 msat) + assert(replyTo.expectMessageType[Confidence].value == 0.0) + reputationRecorder ! GetConfidence(replyTo.ref, c, 0, uuid8, 1000 msat) + assert(replyTo.expectMessageType[Confidence].value === (3.0 / 6) +- 0.001) + } +} \ No newline at end of file diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationSpec.scala new file mode 100644 index 0000000000..4ee8042ad6 --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/reputation/ReputationSpec.scala @@ -0,0 +1,81 @@ +/* + * Copyright 2023 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.reputation + +import fr.acinq.eclair.reputation.Reputation._ +import fr.acinq.eclair.{MilliSatoshiLong, TimestampMilli} +import org.scalactic.Tolerance.convertNumericToPlusOrMinusWrapper +import org.scalatest.funsuite.AnyFunSuite + +import java.util.UUID +import scala.concurrent.duration.DurationInt + +class ReputationSpec extends AnyFunSuite { + val (uuid1, uuid2, uuid3, uuid4, uuid5, uuid6, uuid7) = (UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID()) + + test("basic") { + val r0 = Reputation.init(Config(enabled = true, 1 day, 1 second, 2)) + val (r1, c1) = r0.attempt(uuid1, 10000 msat) + assert(c1 == 0) + val r2 = r1.record(uuid1, isSuccess = true) + val (r3, c3) = r2.attempt(uuid2, 10000 msat) + assert(c3 === (1.0 / 3) +- 0.001) + val (r4, c4) = r3.attempt(uuid3, 10000 msat) + assert(c4 === (1.0 / 5) +- 0.001) + val r5 = r4.record(uuid2, isSuccess = true) + val r6 = r5.record(uuid3, isSuccess = true) + val (r7, c7) = r6.attempt(uuid4, 1 msat) + assert(c7 === 1.0 +- 0.001) + val (r8, c8) = r7.attempt(uuid5, 40000 msat) + assert(c8 === (3.0 / 11) +- 0.001) + val (r9, c9) = r8.attempt(uuid6, 10000 msat) + assert(c9 === (3.0 / 13) +- 0.001) + val r10 = r9.cancel(uuid5) + val r11 = r10.record(uuid6, isSuccess = false) + val (_, c12) = r11.attempt(uuid7, 10000 msat) + assert(c12 === (3.0 / 6) +- 0.001) + } + + test("long HTLC") { + val r0 = Reputation.init(Config(enabled = true, 1000 day, 1 second, 10)) + val (r1, c1) = r0.attempt(uuid1, 100000 msat, TimestampMilli(0)) + assert(c1 == 0) + val r2 = r1.record(uuid1, isSuccess = true, now = TimestampMilli(0)) + val (r3, c3) = r2.attempt(uuid2, 1000 msat, TimestampMilli(0)) + assert(c3 === (10.0 / 11) +- 0.001) + val r4 = r3.record(uuid2, isSuccess = false, now = TimestampMilli(0) + 100.seconds) + val (_, c5) = r4.attempt(uuid3, 0 msat, now = TimestampMilli(0) + 100.seconds) + assert(c5 === 0.5 +- 0.001) + } + + test("exponential decay") { + val r0 = Reputation.init(Config(enabled = true, 100 seconds, 1 second, 1)) + val (r1, _) = r0.attempt(uuid1, 1000 msat, TimestampMilli(0)) + val r2 = r1.record(uuid1, isSuccess = true, now = TimestampMilli(0)) + val (r3, c3) = r2.attempt(uuid2, 1000 msat, TimestampMilli(0)) + assert(c3 == 1.0 / 2) + val r4 = r3.record(uuid2, isSuccess = true, now = TimestampMilli(0)) + val (r5, c5) = r4.attempt(uuid3, 1000 msat, TimestampMilli(0)) + assert(c5 == 2.0 / 3) + val r6 = r5.record(uuid3, isSuccess = true, now = TimestampMilli(0)) + val (r7, c7) = r6.attempt(uuid4, 1000 msat, TimestampMilli(0) + 100.seconds) + assert(c7 == 1.5 / 2.5) + val r8 = r7.cancel(uuid4) + val (_, c9) = r8.attempt(uuid5, 1000 msat, TimestampMilli(0) + 1.hour) + assert(c9 < 0.000001) + } +} \ No newline at end of file