Skip to content

Commit

Permalink
Relay Trampoline payments (#1220)
Browse files Browse the repository at this point in the history
Start relaying trampoline payments with multi-part aggregation (disabled by default,
must be enabled with config).
Recovery after a restart is correctly handled, even if payments were being forwarded.
No DB schema update in this commit.

The trampoline UX will be somewhat bad because many improvements/polish are missing.
Some shortcuts were taken, a few hacks here and there need to be fixed, but nothing too scary.
Those improvements will be done in separate commits before the next release.
  • Loading branch information
t-bast authored Dec 18, 2019
1 parent 2d95168 commit 611f0cf
Show file tree
Hide file tree
Showing 43 changed files with 1,967 additions and 528 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ case class CltvExpiryDelta(private val underlying: Int) extends Ordered[CltvExpi
// @formatter:off
def +(other: Int): CltvExpiryDelta = CltvExpiryDelta(underlying + other)
def +(other: CltvExpiryDelta): CltvExpiryDelta = CltvExpiryDelta(underlying + other.underlying)
def -(other: CltvExpiryDelta): CltvExpiryDelta = CltvExpiryDelta(underlying - other.underlying)
def compare(other: CltvExpiryDelta): Int = underlying.compareTo(other.underlying)
def toInt: Int = underlying
// @formatter:on
Expand Down
10 changes: 9 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.{IncomingPayment, NetworkFee, OutgoingPayment, Stats}
import fr.acinq.eclair.io.Peer.{GetPeerInfo, PeerInfo}
import fr.acinq.eclair.io.{NodeURI, Peer}
import fr.acinq.eclair.payment.send.PaymentInitiator.SendPaymentRequest
import fr.acinq.eclair.payment.send.PaymentInitiator.{SendPaymentRequest, SendTrampolinePaymentRequest}
import fr.acinq.eclair.payment.relay.Relayer.{GetOutgoingChannels, OutgoingChannels, UsableBalance}
import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.receive.MultiPartHandler.ReceivePayment
Expand Down Expand Up @@ -88,6 +88,8 @@ trait Eclair {

def send(externalId_opt: Option[String], recipientNodeId: PublicKey, amount: MilliSatoshi, paymentHash: ByteVector32, invoice_opt: Option[PaymentRequest] = None, maxAttempts_opt: Option[Int] = None, feeThresholdSat_opt: Option[Satoshi] = None, maxFeePct_opt: Option[Double] = None)(implicit timeout: Timeout): Future[UUID]

def sendToTrampoline(invoice: PaymentRequest, trampolineId: PublicKey, trampolineFees: MilliSatoshi, trampolineExpiryDelta: CltvExpiryDelta)(implicit timeout: Timeout): Future[UUID]

def sentInfo(id: Either[UUID, ByteVector32])(implicit timeout: Timeout): Future[Seq[OutgoingPayment]]

def findRoute(targetNodeId: PublicKey, amount: MilliSatoshi, assistedRoutes: Seq[Seq[PaymentRequest.ExtraHop]] = Seq.empty)(implicit timeout: Timeout): Future[RouteResponse]
Expand Down Expand Up @@ -244,6 +246,12 @@ class EclairImpl(appKit: Kit) extends Eclair {
}
}

override def sendToTrampoline(invoice: PaymentRequest, trampolineId: PublicKey, trampolineFees: MilliSatoshi, trampolineExpiryDelta: CltvExpiryDelta)(implicit timeout: Timeout): Future[UUID] = {
val defaultRouteParams = Router.getDefaultRouteParams(appKit.nodeParams.routerConf)
val sendPayment = SendTrampolinePaymentRequest(invoice.amount.get, trampolineFees, invoice, trampolineId, invoice.minFinalCltvExpiryDelta.getOrElse(Channel.MIN_CLTV_EXPIRY_DELTA), trampolineExpiryDelta, Some(defaultRouteParams))
(appKit.paymentInitiator ? sendPayment).mapTo[UUID]
}

override def sentInfo(id: Either[UUID, ByteVector32])(implicit timeout: Timeout): Future[Seq[OutgoingPayment]] = Future {
id match {
case Left(uuid) => appKit.nodeParams.db.payments.listOutgoingPayments(uuid)
Expand Down
10 changes: 7 additions & 3 deletions eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ import fr.acinq.eclair.channel.Register
import fr.acinq.eclair.crypto.LocalKeyManager
import fr.acinq.eclair.db.{BackupHandler, Databases}
import fr.acinq.eclair.io.{Authenticator, Server, Switchboard}
import fr.acinq.eclair.payment.receive.PaymentHandler
import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator}
import fr.acinq.eclair.payment.Auditor
import fr.acinq.eclair.payment.receive.PaymentHandler
import fr.acinq.eclair.payment.relay.{CommandBuffer, Relayer}
import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator}
import fr.acinq.eclair.router._
import fr.acinq.eclair.tor.TorProtocolHandler.OnionServiceVersion
import fr.acinq.eclair.tor.{Controller, TorProtocolHandler}
Expand Down Expand Up @@ -210,6 +210,7 @@ class Setup(datadir: File,
zmqTxConnected = Promise[Done]()
tcpBound = Promise[Done]()
routerInitialized = Promise[Done]()
postRestartCleanUpInitialized = Promise[Done]()

defaultFeerates = {
val confDefaultFeerates = FeeratesPerKB(
Expand Down Expand Up @@ -284,8 +285,11 @@ class Setup(datadir: File,
register = system.actorOf(SimpleSupervisor.props(Props(new Register), "register", SupervisorStrategy.Resume))
commandBuffer = system.actorOf(SimpleSupervisor.props(Props(new CommandBuffer(nodeParams, register)), "command-buffer", SupervisorStrategy.Resume))
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, commandBuffer), "payment-handler", SupervisorStrategy.Resume))
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, register, commandBuffer, paymentHandler), "relayer", SupervisorStrategy.Resume))
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, commandBuffer, paymentHandler, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
authenticator = system.actorOf(SimpleSupervisor.props(Authenticator.props(nodeParams), "authenticator", SupervisorStrategy.Resume))
// Before initializing the switchboard (which re-connects us to the network) and the user-facing parts of the system,
// we want to make sure the handler for post-restart broken HTLCs has finished initializing.
_ <- postRestartCleanUpInitialized.future
switchboard = system.actorOf(SimpleSupervisor.props(Switchboard.props(nodeParams, authenticator, watcher, router, relayer, paymentHandler, wallet), "switchboard", SupervisorStrategy.Resume))
server = system.actorOf(SimpleSupervisor.props(Server.props(nodeParams, authenticator, serverBindingAddress, Some(tcpBound)), "server", SupervisorStrategy.Restart))
paymentInitiator = system.actorOf(SimpleSupervisor.props(PaymentInitiator.props(nodeParams, router, relayer, register), "payment-initiator", SupervisorStrategy.Restart))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2230,6 +2230,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
def origin(c: CMD_ADD_HTLC): Origin = c.upstream match {
case Upstream.Local(id) => Origin.Local(id, Some(sender)) // we were the origin of the payment
case Upstream.Relayed(u) => Origin.Relayed(u.channelId, u.id, u.amountMsat, c.amount) // this is a relayed payment to an outgoing channel
case Upstream.TrampolineRelayed(us) => Origin.TrampolineRelayed(us.map(u => (u.channelId, u.id)).toList, Some(sender)) // this is a relayed payment to an outgoing node
}

def feePaid(fee: Satoshi, tx: Transaction, desc: String, channelId: ByteVector32): Unit = Try { // this may fail with an NPE in tests because context has been cleaned up, but it's not a big deal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ object Upstream {
final case class Local(id: UUID) extends Upstream
/** Our node forwarded a single incoming HTLC to an outgoing channel. */
final case class Relayed(add: UpdateAddHtlc) extends Upstream
/** Our node forwarded an incoming HTLC set to a remote outgoing node (potentially producing multiple downstream HTLCs). */
final case class TrampolineRelayed(adds: Seq[UpdateAddHtlc]) extends Upstream {
val amountIn: MilliSatoshi = adds.map(_.amountMsat).sum
val expiryIn: CltvExpiry = adds.map(_.cltvExpiry).min
}
}

sealed trait Command
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ case class Commitments(channelVersion: ChannelVersion,
localCommit: LocalCommit, remoteCommit: RemoteCommit,
localChanges: LocalChanges, remoteChanges: RemoteChanges,
localNextHtlcId: Long, remoteNextHtlcId: Long,
originChannels: Map[Long, Origin], // for outgoing htlcs relayed through us, the id of the previous channel
originChannels: Map[Long, Origin], // for outgoing htlcs relayed through us, details about the corresponding incoming htlcs
remoteNextCommitInfo: Either[WaitingForRevocation, PublicKey],
commitInput: InputInfo,
remotePerCommitmentSecrets: ShaChain, channelId: ByteVector32) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,15 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
statement.setLong(1, e.amountIn.toLong)
statement.setLong(2, e.amountOut.toLong)
statement.setBytes(3, e.paymentHash.toArray)
statement.setBytes(4, e.fromChannelId.toArray)
statement.setBytes(5, e.toChannelId.toArray)
e match {
case ChannelPaymentRelayed(_, _, _, fromChannelId, toChannelId, _) =>
statement.setBytes(4, fromChannelId.toArray)
statement.setBytes(5, toChannelId.toArray)
case TrampolinePaymentRelayed(_, _, _, _, fromChannelIds, toChannelIds, _) =>
// TODO: @t-bast: we should change the DB schema to allow accurate Trampoline reporting
statement.setBytes(4, fromChannelIds.head.toArray)
statement.setBytes(5, toChannelIds.head.toArray)
}
statement.setLong(6, e.timestamp)
statement.executeUpdate()
}
Expand Down Expand Up @@ -214,7 +221,7 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
val rs = statement.executeQuery()
var q: Queue[PaymentRelayed] = Queue()
while (rs.next()) {
q = q :+ PaymentRelayed(
q = q :+ ChannelPaymentRelayed(
amountIn = MilliSatoshi(rs.getLong("amount_in_msat")),
amountOut = MilliSatoshi(rs.getLong("amount_out_msat")),
paymentHash = rs.getByteVector32("payment_hash"),
Expand Down
136 changes: 3 additions & 133 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,12 @@ package fr.acinq.eclair.io
import java.net.InetSocketAddress

import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, Status, SupervisorStrategy}
import akka.event.LoggingAdapter
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.NodeParams
import fr.acinq.eclair.blockchain.EclairWallet
import fr.acinq.eclair.channel.Helpers.Closing
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.{IncomingPayment, IncomingPaymentStatus, IncomingPaymentsDb, PendingRelayDb}
import fr.acinq.eclair.payment.IncomingPacket
import fr.acinq.eclair.payment.relay.Origin
import fr.acinq.eclair.router.Rebroadcast
import fr.acinq.eclair.transactions.{DirectedHtlc, IN, OUT}
import fr.acinq.eclair.wire.{TemporaryNodeFailure, UpdateAddHtlc}
import scodec.bits.ByteVector

/**
* Ties network connections to peers.
Expand All @@ -42,13 +34,12 @@ class Switchboard(nodeParams: NodeParams, authenticator: ActorRef, watcher: Acto

import Switchboard._

// we pass these to helpers classes so that they have the logging context
implicit def implicitLog: LoggingAdapter = log

authenticator ! self

// we load peers and channels from database
{
val peers = nodeParams.db.peers.listPeers()

// Check if channels that are still in CLOSING state have actually been closed. This can happen when the app is stopped
// just after a channel state has transitioned to CLOSED and before it has effectively been removed.
// Closed channels will be removed, other channels will be restored.
Expand All @@ -57,16 +48,6 @@ class Switchboard(nodeParams: NodeParams, authenticator: ActorRef, watcher: Acto
log.info(s"closing channel ${c.channelId}")
nodeParams.db.channels.removeChannel(c.channelId)
})
val peers = nodeParams.db.peers.listPeers()

checkBrokenHtlcsLink(channels, nodeParams.db.payments, nodeParams.privateKey, nodeParams.globalFeatures) match {
case Nil => ()
case brokenHtlcs =>
val brokenHtlcKiller = context.system.actorOf(Props[HtlcReaper], name = "htlc-reaper")
brokenHtlcKiller ! brokenHtlcs
}

cleanupRelayDb(channels, nodeParams.db.pendingRelay)

channels
.groupBy(_.commitments.remoteParams.nodeId)
Expand Down Expand Up @@ -151,115 +132,4 @@ object Switchboard {

def peerActorName(remoteNodeId: PublicKey): String = s"peer-$remoteNodeId"

/**
* If we have stopped eclair while it was handling HTLCs, it is possible that we are in a state were an incoming HTLC
* was committed by both sides, but we didn't have time to send and/or sign the corresponding HTLC to the downstream
* node (if we're an intermediate node) or didn't have time to fail/fulfill the payment (if we're the recipient).
*
* In that case, if we do nothing, the incoming HTLC will eventually expire and we won't lose money, but the channel
* will get closed, which is a major inconvenience.
*
* This check will detect this and will allow us to fast-settle HTLCs and thus preserve channels.
*/
def checkBrokenHtlcsLink(channels: Seq[HasCommitments], paymentsDb: IncomingPaymentsDb, privateKey: PrivateKey, features: ByteVector)(implicit log: LoggingAdapter): Seq[(UpdateAddHtlc, Option[ByteVector32])] = {
// We are interested in incoming HTLCs, that have been *cross-signed* (otherwise they wouldn't have been relayed).
// They signed it first, so the HTLC will first appear in our commitment tx, and later on in their commitment when
// we subsequently sign it. That's why we need to look in *their* commitment with direction=OUT.
val htlcs_in = channels
.flatMap(_.commitments.remoteCommit.spec.htlcs)
.filter(_.direction == OUT)
.map(_.add)
.map(IncomingPacket.decrypt(_, privateKey, features))
.collect {
case Right(IncomingPacket.ChannelRelayPacket(add, _, _)) => (add, None) // we consider all relayed htlcs
case Right(IncomingPacket.FinalPacket(add, _)) => paymentsDb.getIncomingPayment(add.paymentHash) match {
case Some(IncomingPayment(_, preimage, _, IncomingPaymentStatus.Received(_, _))) => (add, Some(preimage)) // incoming payment that succeeded
case _ => (add, None) // incoming payment that didn't succeed
}
}

// TODO: @t-bast: will need to update this to take into account trampoline-relayed (and thoroughly test).

// Here we do it differently because we need the origin information.
val relayed_out = channels
.flatMap(_.commitments.originChannels.values)
.collect { case r: Origin.Relayed => r }
.toSet

val htlcs_broken = htlcs_in.filterNot {
case (htlc_in, _) => relayed_out.exists(r => r.originChannelId == htlc_in.channelId && r.originHtlcId == htlc_in.id)
}

log.info(s"htlcs_in=${htlcs_in.size} htlcs_out=${relayed_out.size} htlcs_broken=${htlcs_broken.size}")

htlcs_broken
}

/**
* We store [[CMD_FULFILL_HTLC]]/[[CMD_FAIL_HTLC]]/[[CMD_FAIL_MALFORMED_HTLC]]
* in a database (see [[fr.acinq.eclair.payment.relay.CommandBuffer]]) because we
* don't want to lose preimages, or to forget to fail incoming htlcs, which
* would lead to unwanted channel closings.
*
* Because of the way our watcher works, in a scenario where a downstream
* channel has gone to the blockchain, it may send several times the same
* command, and the upstream channel may have disappeared in the meantime.
*
* That's why we need to periodically clean up the pending relay db.
*/
def cleanupRelayDb(channels: Seq[HasCommitments], relayDb: PendingRelayDb)(implicit log: LoggingAdapter): Int = {
// We are interested in incoming HTLCs, that have been *cross-signed* (otherwise they wouldn't have been relayed).
// If the HTLC is not in their commitment, it means that we have already fulfilled/failed it and that we can remove
// the command from the pending relay db.
val channel2Htlc: Set[(ByteVector32, Long)] =
channels
.flatMap(_.commitments.remoteCommit.spec.htlcs)
.filter(_.direction == OUT)
.map(htlc => (htlc.add.channelId, htlc.add.id))
.toSet

val pendingRelay: Set[(ByteVector32, Long)] = relayDb.listPendingRelay()

val toClean = pendingRelay -- channel2Htlc

toClean.foreach {
case (channelId, htlcId) =>
log.info(s"cleaning up channelId=$channelId htlcId=$htlcId from relay db")
relayDb.removePendingRelay(channelId, htlcId)
}
toClean.size
}
}

class HtlcReaper extends Actor with ActorLogging {

context.system.eventStream.subscribe(self, classOf[ChannelStateChanged])

override def receive: Receive = {
case initialHtlcs: Seq[(UpdateAddHtlc, Option[ByteVector32])]@unchecked => context become main(initialHtlcs)
}

def main(htlcs: Seq[(UpdateAddHtlc, Option[ByteVector32])]): Receive = {
case ChannelStateChanged(channel, _, _, WAIT_FOR_INIT_INTERNAL | OFFLINE | SYNCING, NORMAL | SHUTDOWN | CLOSING, data: HasCommitments) =>
val acked = htlcs
.filter(_._1.channelId == data.channelId) // only consider htlcs related to this channel
.filter {
case (htlc, preimage) if Commitments.getHtlcCrossSigned(data.commitments, IN, htlc.id).isDefined =>
// this htlc is cross signed in the current commitment, we can settle it
preimage match {
case Some(preimage) =>
log.info(s"fulfilling broken htlc=$htlc")
channel ! CMD_FULFILL_HTLC(htlc.id, preimage, commit = true)
case None =>
log.info(s"failing broken htlc=$htlc")
channel ! CMD_FAIL_HTLC(htlc.id, Right(TemporaryNodeFailure), commit = true)
}
false // the channel may very well be disconnected before we sign (=ack) the fail/fulfill, so we keep it for now
case _ =>
true // the htlc has already been failed, we can forget about it now
}
acked.foreach { case (htlc, _) => log.info(s"forgetting htlc id=${htlc.id} channelId=${htlc.channelId}") }
context become main(htlcs diff acked)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,20 @@ class Auditor(nodeParams: NodeParams) extends Actor with ActorLogging {
db.add(e)

case e: PaymentRelayed =>
val relayType = e match {
case _: ChannelPaymentRelayed => "channel"
case _: TrampolinePaymentRelayed => "trampoline"
}
Kamon
.histogram("payment.hist")
.withTag("direction", "relayed")
.withTag("relay", relayType)
.withTag("type", "total")
.record(e.amountIn.truncateToSatoshi.toLong)
Kamon
.histogram("payment.hist")
.withTag("direction", "relayed")
.withTag("relay", relayType)
.withTag("type", "fee")
.record((e.amountIn - e.amountOut).truncateToSatoshi.toLong)
db.add(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package fr.acinq.eclair.payment
import java.util.UUID

import fr.acinq.bitcoin.ByteVector32
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.MilliSatoshi
import fr.acinq.eclair.crypto.Sphinx
import fr.acinq.eclair.router.ChannelHop
Expand Down Expand Up @@ -53,7 +54,14 @@ object PaymentSent {

case class PaymentFailed(id: UUID, paymentHash: ByteVector32, failures: Seq[PaymentFailure], timestamp: Long = Platform.currentTime) extends PaymentEvent

case class PaymentRelayed(amountIn: MilliSatoshi, amountOut: MilliSatoshi, paymentHash: ByteVector32, fromChannelId: ByteVector32, toChannelId: ByteVector32, timestamp: Long = Platform.currentTime) extends PaymentEvent
sealed trait PaymentRelayed extends PaymentEvent {
val amountIn: MilliSatoshi
val amountOut: MilliSatoshi
}

case class ChannelPaymentRelayed(amountIn: MilliSatoshi, amountOut: MilliSatoshi, paymentHash: ByteVector32, fromChannelId: ByteVector32, toChannelId: ByteVector32, timestamp: Long = Platform.currentTime) extends PaymentRelayed

case class TrampolinePaymentRelayed(amountIn: MilliSatoshi, amountOut: MilliSatoshi, paymentHash: ByteVector32, toNodeId: PublicKey, fromChannelIds: Seq[ByteVector32], toChannelIds: Seq[ByteVector32], timestamp: Long = Platform.currentTime) extends PaymentRelayed

case class PaymentReceived(paymentHash: ByteVector32, parts: Seq[PaymentReceived.PartialPayment]) extends PaymentEvent {
require(parts.nonEmpty, "must have at least one subpayment")
Expand Down
Loading

0 comments on commit 611f0cf

Please sign in to comment.