Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multipart FSM v2 #1439

Merged
merged 9 commits into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ eclair {
gossip_queries = optional
gossip_queries_ex = optional
var_onion_optin = optional
payment_secret = optional
basic_mpp = optional
}
override-features = [ // optional per-node features
# {
Expand Down
2 changes: 1 addition & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ class Setup(datadir: File,
switchboard = system.actorOf(SimpleSupervisor.props(Switchboard.props(nodeParams, watcher, relayer, wallet), "switchboard", SupervisorStrategy.Resume))
clientSpawner = system.actorOf(SimpleSupervisor.props(ClientSpawner.props(nodeParams, switchboard, router), "client-spawner", SupervisorStrategy.Restart))
server = system.actorOf(SimpleSupervisor.props(Server.props(nodeParams, switchboard, router, serverBindingAddress, Some(tcpBound)), "server", SupervisorStrategy.Restart))
paymentInitiator = system.actorOf(SimpleSupervisor.props(PaymentInitiator.props(nodeParams, router, relayer, register), "payment-initiator", SupervisorStrategy.Restart))
paymentInitiator = system.actorOf(SimpleSupervisor.props(PaymentInitiator.props(nodeParams, router, register), "payment-initiator", SupervisorStrategy.Restart))
_ = for (i <- 0 until config.getInt("autoprobe-count")) yield system.actorOf(SimpleSupervisor.props(Autoprobe.props(nodeParams, router, paymentInitiator), s"payment-autoprobe-$i", SupervisorStrategy.Restart))

kit = Kit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ object Monitoring {
val PaymentParts = Kamon.histogram("payment.parts", "Number of HTLCs per payment (MPP)")
val PaymentFailed = Kamon.counter("payment.failed", "Number of failed payment")
val PaymentError = Kamon.counter("payment.error", "Non-fatal errors encountered during payment attempts")
val PaymentAttempt = Kamon.histogram("payment.attempt", "Number of attempts before a payment succeeds")
val SentPaymentDuration = Kamon.timer("payment.duration.sent", "Outgoing payment duration")
val ReceivedPaymentDuration = Kamon.timer("payment.duration.received", "Incoming payment duration")

// The goal of this metric is to measure whether retrying MPP payments on failing channels yields useful results.
// Once enough data has been collected, we will update the MultiPartPaymentLifecycle logic accordingly.
val RetryFailedChannelsResult = Kamon.counter("payment.mpp.retry-failed-channels-result")

def recordPaymentRelayFailed(failureType: String, relayType: String): Unit =
Metrics.PaymentFailed
.withTag(Tags.Direction, Tags.Directions.Relayed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.MilliSatoshi
import fr.acinq.eclair.crypto.Sphinx
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.router.Router.{ChannelDesc, ChannelHop, Hop}
import fr.acinq.eclair.router.Router.{ChannelDesc, ChannelHop, Hop, Ignore}
import fr.acinq.eclair.wire.Node

/**
Expand Down Expand Up @@ -162,43 +162,43 @@ object PaymentFailure {
.isDefined

/** Update the set of nodes and channels to ignore in retries depending on the failure we received. */
def updateIgnored(failure: PaymentFailure, ignoreNodes: Set[PublicKey], ignoreChannels: Set[ChannelDesc]): (Set[PublicKey], Set[ChannelDesc]) = failure match {
def updateIgnored(failure: PaymentFailure, ignore: Ignore): Ignore = failure match {
case RemoteFailure(hops, Sphinx.DecryptedFailurePacket(nodeId, _)) if nodeId == hops.last.nextNodeId =>
// The failure came from the final recipient: the payment should be aborted without penalizing anyone in the route.
(ignoreNodes, ignoreChannels)
ignore
case RemoteFailure(_, Sphinx.DecryptedFailurePacket(nodeId, _: Node)) =>
(ignoreNodes + nodeId, ignoreChannels)
ignore + nodeId
case RemoteFailure(_, Sphinx.DecryptedFailurePacket(nodeId, failureMessage: Update)) =>
if (Announcements.checkSig(failureMessage.update, nodeId)) {
// We were using an outdated channel update, we should retry with the new one and nobody should be penalized.
(ignoreNodes, ignoreChannels)
ignore
} else {
// This node is fishy, it gave us a bad signature, so let's filter it out.
(ignoreNodes + nodeId, ignoreChannels)
ignore + nodeId
}
case RemoteFailure(hops, Sphinx.DecryptedFailurePacket(nodeId, _)) =>
// Let's ignore the channel outgoing from nodeId.
hops.collectFirst {
case hop: ChannelHop if hop.nodeId == nodeId => ChannelDesc(hop.lastUpdate.shortChannelId, hop.nodeId, hop.nextNodeId)
} match {
case Some(faultyChannel) => (ignoreNodes, ignoreChannels + faultyChannel)
case None => (ignoreNodes, ignoreChannels)
case Some(faultyChannel) => ignore + faultyChannel
case None => ignore
}
case UnreadableRemoteFailure(hops) =>
// We don't know which node is sending garbage, let's blacklist all nodes except the one we are directly connected to and the final recipient.
val blacklist = hops.map(_.nextNodeId).drop(1).dropRight(1)
(ignoreNodes ++ blacklist, ignoreChannels)
val blacklist = hops.map(_.nextNodeId).drop(1).dropRight(1).toSet
ignore ++ blacklist
case LocalFailure(hops, _) => hops.headOption match {
case Some(hop: ChannelHop) =>
val faultyChannel = ChannelDesc(hop.lastUpdate.shortChannelId, hop.nodeId, hop.nextNodeId)
(ignoreNodes, ignoreChannels + faultyChannel)
case _ => (ignoreNodes, ignoreChannels)
ignore + faultyChannel
case _ => ignore
}
}

/** Update the set of nodes and channels to ignore in retries depending on the failures we received. */
def updateIgnored(failures: Seq[PaymentFailure], ignoreNodes: Set[PublicKey], ignoreChannels: Set[ChannelDesc]): (Set[PublicKey], Set[ChannelDesc]) = {
failures.foldLeft((ignoreNodes, ignoreChannels)) { case ((nodes, channels), failure) => updateIgnored(failure, nodes, channels) }
def updateIgnored(failures: Seq[PaymentFailure], ignore: Ignore): Ignore = {
failures.foldLeft(ignore) { case (current, failure) => updateIgnored(failure, current) }
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import fr.acinq.eclair.payment.receive.MultiPartPaymentFSM
import fr.acinq.eclair.payment.send.MultiPartPaymentLifecycle.SendMultiPartPayment
import fr.acinq.eclair.payment.send.PaymentInitiator.SendPaymentConfig
import fr.acinq.eclair.payment.send.PaymentLifecycle.SendPayment
import fr.acinq.eclair.payment.send.{MultiPartPaymentLifecycle, PaymentError, PaymentLifecycle}
import fr.acinq.eclair.payment.send.{MultiPartPaymentLifecycle, PaymentLifecycle}
import fr.acinq.eclair.router.Router.RouteParams
import fr.acinq.eclair.router.{RouteCalculation, RouteNotFound}
import fr.acinq.eclair.router.{BalanceTooLow, RouteCalculation, RouteNotFound}
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{CltvExpiry, Logs, MilliSatoshi, NodeParams, nodeFee, randomBytes32}
import fr.acinq.eclair.{CltvExpiry, Features, Logs, MilliSatoshi, NodeParams, nodeFee, randomBytes32}

import scala.collection.immutable.Queue

Expand All @@ -46,7 +46,7 @@ import scala.collection.immutable.Queue
* It aggregates incoming HTLCs (in case multi-part was used upstream) and then forwards the requested amount (using the
* router to find a route to the remote node and potentially splitting the payment using multi-part).
*/
class NodeRelayer(nodeParams: NodeParams, relayer: ActorRef, router: ActorRef, commandBuffer: ActorRef, register: ActorRef) extends Actor with DiagnosticActorLogging {
class NodeRelayer(nodeParams: NodeParams, router: ActorRef, commandBuffer: ActorRef, register: ActorRef) extends Actor with DiagnosticActorLogging {

import NodeRelayer._

Expand Down Expand Up @@ -147,7 +147,7 @@ class NodeRelayer(nodeParams: NodeParams, relayer: ActorRef, router: ActorRef, c

def spawnOutgoingPayFSM(cfg: SendPaymentConfig, multiPart: Boolean): ActorRef = {
if (multiPart) {
context.actorOf(MultiPartPaymentLifecycle.props(nodeParams, cfg, relayer, router, register))
context.actorOf(MultiPartPaymentLifecycle.props(nodeParams, cfg, router, register))
} else {
context.actorOf(PaymentLifecycle.props(nodeParams, cfg, router, register))
}
Expand All @@ -157,15 +157,21 @@ class NodeRelayer(nodeParams: NodeParams, relayer: ActorRef, router: ActorRef, c
val paymentId = UUID.randomUUID()
val paymentCfg = SendPaymentConfig(paymentId, paymentId, None, paymentHash, payloadOut.amountToForward, payloadOut.outgoingNodeId, upstream, None, storeInDb = false, publishEvent = false, Nil)
val routeParams = computeRouteParams(nodeParams, upstream.amountIn, upstream.expiryIn, payloadOut.amountToForward, payloadOut.outgoingCltv)
// If invoice features are provided in the onion, the sender is asking us to relay to a non-trampoline recipient.
payloadOut.invoiceFeatures match {
case Some(_) =>
log.debug("relaying trampoline payment to non-trampoline recipient")
case Some(features) =>
val routingHints = payloadOut.invoiceRoutingInfo.map(_.map(_.toSeq).toSeq).getOrElse(Nil)
// TODO: @t-bast: MPP is disabled for trampoline to non-trampoline payments until we improve the splitting algorithm for nodes with a lot of channels.
val payFSM = spawnOutgoingPayFSM(paymentCfg, multiPart = false)
val finalPayload = Onion.createSinglePartPayload(payloadOut.amountToForward, payloadOut.outgoingCltv, payloadOut.paymentSecret)
val payment = SendPayment(payloadOut.outgoingNodeId, finalPayload, nodeParams.maxPaymentAttempts, routingHints, Some(routeParams))
payFSM ! payment
payloadOut.paymentSecret match {
case Some(paymentSecret) if Features(features).hasFeature(Features.BasicMultiPartPayment) =>
log.debug("relaying trampoline payment to non-trampoline recipient using MPP")
val payment = SendMultiPartPayment(paymentSecret, payloadOut.outgoingNodeId, payloadOut.amountToForward, payloadOut.outgoingCltv, nodeParams.maxPaymentAttempts, routingHints, Some(routeParams))
spawnOutgoingPayFSM(paymentCfg, multiPart = true) ! payment
case _ =>
log.debug("relaying trampoline payment to non-trampoline recipient without MPP")
val finalPayload = Onion.createSinglePartPayload(payloadOut.amountToForward, payloadOut.outgoingCltv, payloadOut.paymentSecret)
val payment = SendPayment(payloadOut.outgoingNodeId, finalPayload, nodeParams.maxPaymentAttempts, routingHints, Some(routeParams))
spawnOutgoingPayFSM(paymentCfg, multiPart = false) ! payment
}
case None =>
log.debug("relaying trampoline payment to next trampoline node")
val payFSM = spawnOutgoingPayFSM(paymentCfg, multiPart = true)
Expand Down Expand Up @@ -208,7 +214,7 @@ class NodeRelayer(nodeParams: NodeParams, relayer: ActorRef, router: ActorRef, c

object NodeRelayer {

def props(nodeParams: NodeParams, relayer: ActorRef, router: ActorRef, commandBuffer: ActorRef, register: ActorRef) = Props(new NodeRelayer(nodeParams, relayer, router, commandBuffer, register))
def props(nodeParams: NodeParams, router: ActorRef, commandBuffer: ActorRef, register: ActorRef) = Props(new NodeRelayer(nodeParams, router, commandBuffer, register))

/**
* We start by aggregating an incoming HTLC set. Once we received the whole set, we will compute a route to the next
Expand Down Expand Up @@ -259,15 +265,11 @@ object NodeRelayer {
* should return upstream.
*/
private def translateError(failures: Seq[PaymentFailure], outgoingNodeId: PublicKey): Option[FailureMessage] = {
def tooManyRouteNotFound(failures: Seq[PaymentFailure]): Boolean = {
val routeNotFoundCount = failures.collect { case f@LocalFailure(_, RouteNotFound) => f }.length
routeNotFoundCount > failures.length / 2
}

val routeNotFound = failures.collectFirst { case f@LocalFailure(_, RouteNotFound) => f }.nonEmpty
failures match {
case Nil => None
case LocalFailure(_, PaymentError.BalanceTooLow) :: Nil => Some(TemporaryNodeFailure) // we don't have enough outgoing liquidity at the moment
case _ if tooManyRouteNotFound(failures) => Some(TrampolineFeeInsufficient) // if we couldn't find routes, it's likely that the fee/cltv was insufficient
case LocalFailure(_, BalanceTooLow) :: Nil => Some(TemporaryNodeFailure) // we don't have enough outgoing liquidity at the moment
case _ if routeNotFound => Some(TrampolineFeeInsufficient) // if we couldn't find routes, it's likely that the fee/cltv was insufficient
case _ =>
// Otherwise, we try to find a downstream error that we could decrypt.
val outgoingNodeFailure = failures.collectFirst { case RemoteFailure(_, e) if e.originNode == outgoingNodeId => e.failureMessage }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, comm

private val postRestartCleaner = context.actorOf(PostRestartHtlcCleaner.props(nodeParams, commandBuffer, initialized))
private val channelRelayer = context.actorOf(ChannelRelayer.props(nodeParams, self, register, commandBuffer))
private val nodeRelayer = context.actorOf(NodeRelayer.props(nodeParams, self, router, commandBuffer, register))
private val nodeRelayer = context.actorOf(NodeRelayer.props(nodeParams, router, commandBuffer, register))

override def receive: Receive = main(Map.empty, mutable.MultiDict.empty[PublicKey, ShortChannelId])

Expand Down Expand Up @@ -202,7 +202,7 @@ class Relayer(nodeParams: NodeParams, router: ActorRef, register: ActorRef, comm
object Relayer extends Logging {

def props(nodeParams: NodeParams, router: ActorRef, register: ActorRef, commandBuffer: ActorRef, paymentHandler: ActorRef, initialized: Option[Promise[Done]] = None) =
Props(classOf[Relayer], nodeParams, router, register, commandBuffer, paymentHandler, initialized)
Props(new Relayer(nodeParams, router, register, commandBuffer, paymentHandler, initialized))

type ChannelUpdates = Map[ShortChannelId, OutgoingChannel]
type NodeChannels = mutable.MultiDict[PublicKey, ShortChannelId]
Expand Down
Loading