Skip to content

Commit

Permalink
Clean up HTLCs after a node restart.
Browse files Browse the repository at this point in the history
The previous logic has been moved from Switchboard to payments.

There are two cases to consider:
  * HTLCs not relayed at all: we can fail instantly
  * HTLCs partially relayed: we need to wait for downstream responses
  • Loading branch information
t-bast committed Nov 29, 2019
1 parent e8ee523 commit 770d32f
Show file tree
Hide file tree
Showing 12 changed files with 842 additions and 449 deletions.
138 changes: 3 additions & 135 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,12 @@ package fr.acinq.eclair.io
import java.net.InetSocketAddress

import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, Status, SupervisorStrategy}
import akka.event.LoggingAdapter
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.NodeParams
import fr.acinq.eclair.blockchain.EclairWallet
import fr.acinq.eclair.channel.Helpers.Closing
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.{IncomingPayment, IncomingPaymentStatus, IncomingPaymentsDb, PendingRelayDb}
import fr.acinq.eclair.payment.IncomingPacket
import fr.acinq.eclair.payment.relay.Origin
import fr.acinq.eclair.router.Rebroadcast
import fr.acinq.eclair.transactions.{IN, OUT}
import fr.acinq.eclair.wire.{TemporaryNodeFailure, UpdateAddHtlc}
import scodec.bits.ByteVector

/**
* Ties network connections to peers.
Expand All @@ -42,13 +34,12 @@ class Switchboard(nodeParams: NodeParams, authenticator: ActorRef, watcher: Acto

import Switchboard._

// we pass these to helpers classes so that they have the logging context
implicit def implicitLog: LoggingAdapter = log

authenticator ! self

// we load peers and channels from database
{
val peers = nodeParams.db.peers.listPeers()

// Check if channels that are still in CLOSING state have actually been closed. This can happen when the app is stopped
// just after a channel state has transitioned to CLOSED and before it has effectively been removed.
// Closed channels will be removed, other channels will be restored.
Expand All @@ -57,16 +48,6 @@ class Switchboard(nodeParams: NodeParams, authenticator: ActorRef, watcher: Acto
log.info(s"closing channel ${c.channelId}")
nodeParams.db.channels.removeChannel(c.channelId)
})
val peers = nodeParams.db.peers.listPeers()

checkBrokenHtlcsLink(channels, nodeParams.db.payments, nodeParams.privateKey, nodeParams.globalFeatures) match {
case Nil => ()
case brokenHtlcs =>
val brokenHtlcKiller = context.system.actorOf(Props[HtlcReaper], name = "htlc-reaper")
brokenHtlcKiller ! brokenHtlcs
}

cleanupRelayDb(channels, nodeParams.db.pendingRelay)

channels
.groupBy(_.commitments.remoteParams.nodeId)
Expand Down Expand Up @@ -151,117 +132,4 @@ object Switchboard {

def peerActorName(remoteNodeId: PublicKey): String = s"peer-$remoteNodeId"

/**
* If we have stopped eclair while it was handling HTLCs, it is possible that we are in a state were an incoming HTLC
* was committed by both sides, but we didn't have time to send and/or sign the corresponding HTLC to the downstream
* node (if we're an intermediate node) or didn't have time to fail/fulfill the payment (if we're the recipient).
*
* In that case, if we do nothing, the incoming HTLC will eventually expire and we won't lose money, but the channel
* will get closed, which is a major inconvenience.
*
* This check will detect this and will allow us to fast-settle HTLCs and thus preserve channels.
*/
def checkBrokenHtlcsLink(channels: Seq[HasCommitments], paymentsDb: IncomingPaymentsDb, privateKey: PrivateKey, features: ByteVector)(implicit log: LoggingAdapter): Seq[(UpdateAddHtlc, Option[ByteVector32])] = {
// We are interested in incoming HTLCs, that have been *cross-signed* (otherwise they wouldn't have been relayed).
// They signed it first, so the HTLC will first appear in our commitment tx, and later on in their commitment when
// we subsequently sign it. That's why we need to look in *their* commitment with direction=OUT.
val htlcs_in = channels
.flatMap(_.commitments.remoteCommit.spec.htlcs)
.filter(_.direction == OUT)
.map(_.add)
.map(IncomingPacket.decrypt(_, privateKey, features))
.collect {
case Right(IncomingPacket.ChannelRelayPacket(add, _, _)) => (add, None) // we consider all relayed htlcs
case Right(IncomingPacket.FinalPacket(add, _)) => paymentsDb.getIncomingPayment(add.paymentHash) match {
case Some(IncomingPayment(_, preimage, _, IncomingPaymentStatus.Received(_, _))) => (add, Some(preimage)) // incoming payment that succeeded
case _ => (add, None) // incoming payment that didn't succeed
}
}

// TODO: @t-bast: will need to update this to take into account trampoline-relayed (and thoroughly test).

// Here we do it differently because we need the origin information.
val relayed_out = channels
.flatMap(_.commitments.originChannels.values)
.collect { case r: Origin.Relayed => r }
.toSet

val htlcs_broken = htlcs_in.filterNot {
case (htlc_in, _) => relayed_out.exists(r => r.originChannelId == htlc_in.channelId && r.originHtlcId == htlc_in.id)
}

log.info(s"htlcs_in=${htlcs_in.size} htlcs_out=${relayed_out.size} htlcs_broken=${htlcs_broken.size}")

htlcs_broken
}

/**
* We store [[CMD_FULFILL_HTLC]]/[[CMD_FAIL_HTLC]]/[[CMD_FAIL_MALFORMED_HTLC]]
* in a database (see [[fr.acinq.eclair.payment.relay.CommandBuffer]]) because we
* don't want to lose preimages, or to forget to fail incoming htlcs, which
* would lead to unwanted channel closings.
*
* Because of the way our watcher works, in a scenario where a downstream
* channel has gone to the blockchain, it may send several times the same
* command, and the upstream channel may have disappeared in the meantime.
*
* That's why we need to periodically clean up the pending relay db.
*/
def cleanupRelayDb(channels: Seq[HasCommitments], relayDb: PendingRelayDb)(implicit log: LoggingAdapter): Int = {

// We are interested in incoming HTLCs, that have been *cross-signed* (otherwise they wouldn't have been relayed).
// If the HTLC is not in their commitment, it means that we have already fulfilled/failed it and that we can remove
// the command from the pending relay db.
val channel2Htlc: Set[(ByteVector32, Long)] =
channels
.flatMap(_.commitments.remoteCommit.spec.htlcs)
.filter(_.direction == OUT)
.map(htlc => (htlc.add.channelId, htlc.add.id))
.toSet

val pendingRelay: Set[(ByteVector32, Long)] = relayDb.listPendingRelay()

val toClean = pendingRelay -- channel2Htlc

toClean.foreach {
case (channelId, htlcId) =>
log.info(s"cleaning up channelId=$channelId htlcId=$htlcId from relay db")
relayDb.removePendingRelay(channelId, htlcId)
}
toClean.size
}

}

class HtlcReaper extends Actor with ActorLogging {

context.system.eventStream.subscribe(self, classOf[ChannelStateChanged])

override def receive: Receive = {
case initialHtlcs: Seq[(UpdateAddHtlc, Option[ByteVector32])]@unchecked => context become main(initialHtlcs)
}

def main(htlcs: Seq[(UpdateAddHtlc, Option[ByteVector32])]): Receive = {
case ChannelStateChanged(channel, _, _, WAIT_FOR_INIT_INTERNAL | OFFLINE | SYNCING, NORMAL | SHUTDOWN | CLOSING, data: HasCommitments) =>
val acked = htlcs
.filter(_._1.channelId == data.channelId) // only consider htlcs related to this channel
.filter {
case (htlc, preimage) if Commitments.getHtlcCrossSigned(data.commitments, IN, htlc.id).isDefined =>
// this htlc is cross signed in the current commitment, we can settle it
preimage match {
case Some(preimage) =>
log.info(s"fulfilling broken htlc=$htlc")
channel ! CMD_FULFILL_HTLC(htlc.id, preimage, commit = true)
case None =>
log.info(s"failing broken htlc=$htlc")
channel ! CMD_FAIL_HTLC(htlc.id, Right(TemporaryNodeFailure), commit = true)
}
false // the channel may very well be disconnected before we sign (=ack) the fail/fulfill, so we keep it for now
case _ =>
true // the htlc has already been failed, we can forget about it now
}
acked.foreach { case (htlc, _) => log.info(s"forgetting htlc id=${htlc.id} channelId=${htlc.channelId}") }
context become main(htlcs diff acked)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ class NodeRelayer(nodeParams: NodeParams, relayer: ActorRef, router: ActorRef, c
handler ! PoisonPill
validateRelay(nodeParams, upstream, nextPayload) match {
case Some(failure) =>
log.warning(s"rejecting trampoline payment with paymentHash=$paymentHash (amountIn=${upstream.amountIn} expiryIn=${upstream.expiryIn} htlcCount=${parts.length} reason=$failure)")
log.warning(s"rejecting trampoline payment with paymentHash=$paymentHash (amountIn=${upstream.amountIn} expiryIn=${upstream.expiryIn} amountOut=${nextPayload.amountToForward} expiryOut=${nextPayload.outgoingCltv} htlcCount=${parts.length} reason=$failure)")
rejectPayment(upstream, Some(failure))
case None =>
log.info(s"relaying trampoline payment with paymentHash=$paymentHash (amountIn=${upstream.amountIn} expiryIn=${upstream.expiryIn} htlcCount=${parts.length})")
log.info(s"relaying trampoline payment with paymentHash=$paymentHash (amountIn=${upstream.amountIn} expiryIn=${upstream.expiryIn} amountOut=${nextPayload.amountToForward} expiryOut=${nextPayload.outgoingCltv} htlcCount=${parts.length})")
val paymentId = relay(paymentHash, upstream, nextPayload, nextPacket)
context become main(pendingIncoming - paymentHash, pendingOutgoing + (paymentId -> upstream))
}
Expand Down
Loading

0 comments on commit 770d32f

Please sign in to comment.