Skip to content

Commit

Permalink
Unwatch obsolete transactions
Browse files Browse the repository at this point in the history
We unwatch:

- channels that have been spliced or closed
- RBF candidates of a splice or closing transaction

We revert the modification to channel updates to avoid invalidating
the signature.
  • Loading branch information
t-bast authored and remyers committed Dec 10, 2024
1 parent e79c65b commit 0f250b1
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,9 @@ object ZmqWatcher {
def spendingTx: Transaction
}

case class WatchExternalChannelSpent(replyTo: ActorRef[WatchExternalChannelSpentTriggered], txId: TxId, outputIndex: Int, shortChannelId: RealShortChannelId) extends WatchSpent[WatchExternalChannelSpentTriggered] {
override def hints: Set[TxId] = Set.empty
}
case class WatchExternalChannelSpent(replyTo: ActorRef[WatchExternalChannelSpentTriggered], txId: TxId, outputIndex: Int, shortChannelId: RealShortChannelId) extends WatchSpent[WatchExternalChannelSpentTriggered] { override def hints: Set[TxId] = Set.empty }
case class WatchExternalChannelSpentTriggered(shortChannelId: RealShortChannelId, spendingTx: Transaction) extends WatchSpentTriggered
case class UnwatchExternalChannelSpent(txId: TxId, outputIndex: Int) extends Command

case class WatchFundingSpent(replyTo: ActorRef[WatchFundingSpentTriggered], txId: TxId, outputIndex: Int, hints: Set[TxId]) extends WatchSpent[WatchFundingSpentTriggered]
case class WatchFundingSpentTriggered(spendingTx: Transaction) extends WatchSpentTriggered
Expand Down Expand Up @@ -356,6 +355,11 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
}
watching(watches -- deprecatedWatches, watchedUtxos)

case UnwatchExternalChannelSpent(txId, outputIndex) =>
val deprecatedWatches = watches.keySet.collect { case w: WatchExternalChannelSpent if w.txId == txId && w.outputIndex == outputIndex => w }
val watchedUtxos1 = deprecatedWatches.foldLeft(watchedUtxos) { case (m, w) => removeWatchedUtxos(m, w) }
watching(watches -- deprecatedWatches, watchedUtxos1)

case ValidateRequest(replyTo, ann) =>
client.validate(ann).map(replyTo ! _)
Behaviors.same
Expand Down
16 changes: 4 additions & 12 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{BlockHash, ByteVector32, Satoshi, TxId}
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair._
import fr.acinq.eclair.blockchain.CurrentBlockHeight
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{ValidateResult, WatchExternalChannelSpent, WatchExternalChannelSpentTriggered, WatchTxConfirmed, WatchTxConfirmedTriggered}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._
import fr.acinq.eclair.channel._
import fr.acinq.eclair.channel.fsm.Channel.ANNOUNCEMENTS_MINCONF
import fr.acinq.eclair.crypto.TransportHandler
Expand Down Expand Up @@ -263,14 +262,14 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
stay() using Validation.handleChannelValidationResponse(d, nodeParams, watcher, r)

case Event(WatchExternalChannelSpentTriggered(shortChannelId, spendingTx), d) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) =>
val txId = d.channels.getOrElse(shortChannelId, d.prunedChannels(shortChannelId)).fundingTxId
log.info("funding tx txId={} of channelId={} has been spent by txId={}: waiting for the spending tx to have enough confirmations before removing the channel from the graph", txId, shortChannelId, spendingTx.txid)
val fundingTxId = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get.fundingTxId
log.info("funding tx txId={} of channelId={} has been spent by txId={}: waiting for the spending tx to have enough confirmations before removing the channel from the graph", fundingTxId, shortChannelId, spendingTx.txid)
watcher ! WatchTxConfirmed(self, spendingTx.txid, ANNOUNCEMENTS_MINCONF * 2)
stay() using d.copy(spentChannels = d.spentChannels + (spendingTx.txid -> shortChannelId))

case Event(WatchTxConfirmedTriggered(_, _, spendingTx), d) =>
d.spentChannels.get(spendingTx.txid) match {
case Some(shortChannelId) => stay() using Validation.handleChannelSpent(d, nodeParams.db.network, shortChannelId)
case Some(shortChannelId) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, shortChannelId)
case None => stay()
}

Expand Down Expand Up @@ -585,7 +584,6 @@ object Router {
def +(ignoreNode: PublicKey): Ignore = copy(nodes = nodes + ignoreNode)
def ++(ignoreNodes: Set[PublicKey]): Ignore = copy(nodes = nodes ++ ignoreNodes)
def +(ignoreChannel: ChannelDesc): Ignore = copy(channels = channels + ignoreChannel)
def emptyNodes(): Ignore = copy(nodes = Set.empty)
def emptyChannels(): Ignore = copy(channels = Set.empty)
// @formatter:on
}
Expand Down Expand Up @@ -634,12 +632,6 @@ object Router {
/** Full route including the final hop, if any. */
val fullRoute: Seq[Hop] = hops ++ finalHop_opt.toSeq

/**
* Fee paid for the trampoline hop, if any.
* Note that when using MPP to reach the trampoline node, the trampoline fee must be counted only once.
*/
val trampolineFee: MilliSatoshi = finalHop_opt.collect { case hop: NodeHop => hop.fee(amount) }.getOrElse(0 msat)

/**
* Fee paid for the blinded route, if any.
* Note that when we are the introduction node for the blinded route, we cannot easily compute the fee without the
Expand Down
30 changes: 19 additions & 11 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import fr.acinq.bitcoin.scalacompat.Script.{pay2wsh, write}
import fr.acinq.bitcoin.scalacompat.{Satoshi, TxId}
import fr.acinq.eclair.ShortChannelId.outputIndex
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{UtxoStatus, ValidateRequest, ValidateResult, WatchExternalChannelSpent}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._
import fr.acinq.eclair.channel._
import fr.acinq.eclair.crypto.TransportHandler
import fr.acinq.eclair.db.NetworkDb
Expand All @@ -33,7 +33,7 @@ import fr.acinq.eclair.router.Monitoring.Metrics
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.transactions.Scripts
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{BlockHeight, Logs, MilliSatoshiLong, NodeParams, RealShortChannelId, ShortChannelId, TimestampSecond, TxCoordinates}
import fr.acinq.eclair.{BlockHeight, Logs, MilliSatoshiLong, NodeParams, RealShortChannelId, ShortChannelId, TxCoordinates}

object Validation {

Expand Down Expand Up @@ -120,8 +120,10 @@ object Validation {
Some(updateSplicedPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, parentChannel))
case None =>
log.error("spent parent channel shortChannelId={} not found for splice shortChannelId={}", parentScid, c.shortChannelId)
val spentChannels1 = d0.spentChannels.filter(_._2 != parentScid)
Some(addPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, None).copy(spentChannels = spentChannels1))
val spendingTxs = d0.spentChannels.filter(_._2 == parentScid).keySet
spendingTxs.foreach(txId => watcher ! UnwatchTxConfirmed(txId))
val d1 = d0.copy(spentChannels = d0.spentChannels -- spendingTxs)
Some(addPublicChannel(d1, nodeParams, watcher, c, tx.txid, capacity, None))
}
case None => Some(addPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, None))
}
Expand Down Expand Up @@ -171,6 +173,7 @@ object Validation {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
val fundingOutputIndex = outputIndex(ann.shortChannelId)
watcher ! WatchExternalChannelSpent(ctx.self, spliceTxId, fundingOutputIndex, ann.shortChannelId)
watcher ! UnwatchExternalChannelSpent(parentChannel.fundingTxId, outputIndex(parentChannel.ann.shortChannelId))
// we notify front nodes that the channel has been replaced
ctx.system.eventStream.publish(ChannelsDiscovered(SingleChannelDiscovered(ann, capacity, None, None) :: Nil))
ctx.system.eventStream.publish(ChannelLost(parentChannel.shortChannelId))
Expand All @@ -180,15 +183,17 @@ object Validation {
ann = ann,
fundingTxId = spliceTxId,
capacity = capacity,
// update the timestamps of the channel updates to ensure the spliced channel is not pruned
update_1_opt = parentChannel.update_1_opt.map(_.copy(shortChannelId = ann.shortChannelId, timestamp = TimestampSecond.now())),
update_2_opt = parentChannel.update_2_opt.map(_.copy(shortChannelId = ann.shortChannelId, timestamp = TimestampSecond.now())),
// we keep the previous channel updates to ensure that the channel is still used until we receive the new ones
update_1_opt = parentChannel.update_1_opt,
update_2_opt = parentChannel.update_2_opt,
)
log.debug("replacing parent channel scid={} with splice channel scid={}; splice channel={}", parentChannel.shortChannelId, ann.shortChannelId, newPubChan)
// we need to update the graph because the edge identifiers and capacity change from the parent scid to the new splice scid
log.debug("updating the graph for shortChannelId={}", newPubChan.shortChannelId)
val graph1 = d.graphWithBalances.updateChannel(ChannelDesc(parentChannel.shortChannelId, parentChannel.nodeId1, parentChannel.nodeId2), ann.shortChannelId, capacity)
val spentChannels1 = d.spentChannels.filter(_._2 != parentChannel.shortChannelId)
val spendingTxs = d.spentChannels.filter(_._2 == parentChannel.shortChannelId).keySet
spendingTxs.foreach(txId => watcher ! UnwatchTxConfirmed(txId))
val spentChannels1 = d.spentChannels -- spendingTxs
d.copy(
// we also add the splice scid -> channelId and remove the parent scid -> channelId mappings
channels = d.channels + (newPubChan.shortChannelId -> newPubChan) - parentChannel.shortChannelId,
Expand Down Expand Up @@ -262,9 +267,9 @@ object Validation {
} else d1
}

def handleChannelSpent(d: Data, db: NetworkDb, shortChannelId: RealShortChannelId)(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
def handleChannelSpent(d: Data, watcher: typed.ActorRef[ZmqWatcher.Command], db: NetworkDb, shortChannelId: RealShortChannelId)(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
val lostChannel = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get.ann
val lostChannel = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get
log.info("funding tx for channelId={} was spent", shortChannelId)
// we need to remove nodes that aren't tied to any channels anymore
val channels1 = d.channels - shortChannelId
Expand All @@ -287,7 +292,10 @@ object Validation {
// we no longer need to track this or alternative transactions that spent the parent channel
// either this channel was really closed, or it was spliced and the announcement was not received in time
// we will re-add a spliced channel as a new channel later when we receive the announcement
val spentChannels1 = d.spentChannels.filter(_._2 != shortChannelId)
watcher ! UnwatchExternalChannelSpent(lostChannel.fundingTxId, outputIndex(lostChannel.ann.shortChannelId))
val spendingTxs = d.spentChannels.filter(_._2 == shortChannelId).keySet
spendingTxs.foreach(txId => watcher ! UnwatchTxConfirmed(txId))
val spentChannels1 = d.spentChannels -- spendingTxs
d.copy(nodes = d.nodes -- lostNodes, channels = channels1, prunedChannels = prunedChannels1, graphWithBalances = graphWithBalances1, spentChannels = spentChannels1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,54 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind
})
}

test("unwatch external channel") {
withWatcher(f => {
import f._

val (priv, address) = createExternalAddress()
val tx1 = sendToAddress(address, 250_000 sat, probe)
val outputIndex1 = tx1.txOut.indexWhere(_.publicKeyScript == Script.write(Script.pay2wpkh(priv.publicKey)))
val spendingTx1 = createSpendP2WPKH(tx1, priv, priv.publicKey, 500 sat, 0, 0)
val tx2 = sendToAddress(address, 200_000 sat, probe)
val outputIndex2 = tx2.txOut.indexWhere(_.publicKeyScript == Script.write(Script.pay2wpkh(priv.publicKey)))
val spendingTx2 = createSpendP2WPKH(tx2, priv, priv.publicKey, 500 sat, 0, 0)

watcher ! WatchExternalChannelSpent(probe.ref, tx1.txid, outputIndex1, RealShortChannelId(3))
watcher ! WatchExternalChannelSpent(probe.ref, tx2.txid, outputIndex2, RealShortChannelId(5))
watcher ! UnwatchExternalChannelSpent(tx1.txid, outputIndex1 + 1) // ignored
watcher ! UnwatchExternalChannelSpent(randomTxId(), outputIndex1) // ignored

// When publishing the transaction, the watch triggers immediately.
bitcoinClient.publishTransaction(spendingTx1)
probe.expectMsg(WatchExternalChannelSpentTriggered(RealShortChannelId(3), spendingTx1))
probe.expectNoMessage(100 millis)

// If we unwatch the transaction, we will ignore when it's published.
watcher ! UnwatchExternalChannelSpent(tx2.txid, outputIndex2)
bitcoinClient.publishTransaction(spendingTx2)
probe.expectNoMessage(100 millis)

// If we watch again, this will trigger immediately because the transaction is in the mempool.
watcher ! WatchExternalChannelSpent(probe.ref, tx2.txid, outputIndex2, RealShortChannelId(5))
probe.expectMsg(WatchExternalChannelSpentTriggered(RealShortChannelId(5), spendingTx2))
probe.expectNoMessage(100 millis)

// We make the transactions confirm while we're not watching.
watcher ! UnwatchExternalChannelSpent(tx1.txid, outputIndex1)
watcher ! UnwatchExternalChannelSpent(tx2.txid, outputIndex2)
bitcoinClient.getBlockHeight().pipeTo(probe.ref)
val initialBlockHeight = probe.expectMsgType[BlockHeight]
system.eventStream.subscribe(probe.ref, classOf[CurrentBlockHeight])
generateBlocks(1)
awaitCond(probe.expectMsgType[CurrentBlockHeight].blockHeight >= initialBlockHeight + 1)

// If we watch again after confirmation, the watch instantly triggers.
watcher ! WatchExternalChannelSpent(probe.ref, tx1.txid, outputIndex1, RealShortChannelId(3))
probe.expectMsg(WatchExternalChannelSpentTriggered(RealShortChannelId(3), spendingTx1))
probe.expectNoMessage(100 millis)
})
}

test("watch for unknown spent transactions") {
withWatcher(f => {
import f._
Expand Down
45 changes: 25 additions & 20 deletions eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.transactions.Scripts
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{BlockHeight, CltvExpiryDelta, Features, MilliSatoshi, MilliSatoshiLong, RealShortChannelId, ShortChannelId, TestConstants, TimestampSecond, randomBytes32, randomKey}
import org.scalatest.Inside.inside
import scodec.bits._

import scala.concurrent.duration._
Expand Down Expand Up @@ -318,9 +319,11 @@ class RouterSpec extends BaseRouterSpec {
probe.expectMsg(PublicNode(node_b, 2, publicChannelCapacity * 2))
}

def spendingTx(node1: PublicKey, node2: PublicKey): Transaction = {
val originalFundingTx = Transaction(version = 0, txIn = Nil, txOut = TxOut(publicChannelCapacity, write(pay2wsh(Scripts.multiSig2of2(node1, node2)))) :: Nil, lockTime = 0)
Transaction(version = 0, txIn = TxIn(outPoint = OutPoint(originalFundingTx,0), signatureScript = write(pay2wsh(Scripts.multiSig2of2(node1, node2))), sequence = 0) :: Nil, txOut = TxOut(publicChannelCapacity, write(pay2wsh(Scripts.multiSig2of2(node1, node1)))) :: Nil, lockTime = 0)
def spendingTx(node1: PublicKey, node2: PublicKey, capacity: Satoshi = publicChannelCapacity): Transaction = {
val fundingScript = write(pay2wsh(Scripts.multiSig2of2(node1, node2)))
val previousFundingTx = Transaction(version = 2, txIn = Nil, txOut = TxOut(capacity, fundingScript) :: Nil, lockTime = 0)
val nextFundingTx = Transaction(version = 0, txIn = TxIn(OutPoint(previousFundingTx, 0), fundingScript, 0) :: Nil, txOut = TxOut(capacity, fundingScript) :: Nil, lockTime = 0)
nextFundingTx
}

test("properly announce lost channels and nodes") { fixture =>
Expand Down Expand Up @@ -1098,11 +1101,6 @@ class RouterSpec extends BaseRouterSpec {
}
}

def spliceTx(node1: PublicKey, node2: PublicKey, newCapacity: Satoshi): Transaction = {
val originalFundingTx = Transaction(version = 0, txIn = Nil, txOut = TxOut(publicChannelCapacity, write(pay2wsh(Scripts.multiSig2of2(node1, node2)))) :: Nil, lockTime = 0)
Transaction(version = 0, txIn = TxIn(outPoint = OutPoint(originalFundingTx,0), signatureScript = write(pay2wsh(Scripts.multiSig2of2(node1, node2))), sequence = 0) :: Nil, txOut = TxOut(newCapacity, write(pay2wsh(Scripts.multiSig2of2(node1, node2)))) :: Nil, lockTime = 0)
}

test("update an existing channel after a splice") { fixture =>
import fixture._

Expand All @@ -1111,15 +1109,23 @@ class RouterSpec extends BaseRouterSpec {
val peerConnection = TestProbe()

// Channel ab is spent by a splice tx.
val newCapacity = publicChannelCapacity - 100_000.sat
router ! WatchExternalChannelSpentTriggered(scid_ab, spliceTx(funding_a, funding_b, newCapacity))
assert(watcher.expectMsgType[WatchTxConfirmed].minDepth == 12)
val capacity1 = publicChannelCapacity - 100_000.sat
val spliceTx1 = spendingTx(funding_a, funding_b, capacity1)
router ! WatchExternalChannelSpentTriggered(scid_ab, spliceTx1)
inside(watcher.expectMsgType[WatchTxConfirmed]) { w =>
assert(w.txId == spliceTx1.txid)
assert(w.minDepth == 12)
}
eventListener.expectNoMessage(100 millis)

// Channel ab is spent and confirmed by an RBF of splice tx.
val newCapacity1 = publicChannelCapacity - 100_000.sat - 1000.sat
router ! WatchExternalChannelSpentTriggered(scid_ab, spliceTx(funding_a, funding_b, newCapacity1))
assert(watcher.expectMsgType[WatchTxConfirmed].minDepth == 12)
val capacity2 = publicChannelCapacity - 100_000.sat - 1000.sat
val spliceTx2 = spendingTx(funding_a, funding_b, capacity2)
router ! WatchExternalChannelSpentTriggered(scid_ab, spliceTx2)
inside(watcher.expectMsgType[WatchTxConfirmed]) { w =>
assert(w.txId == spliceTx2.txid)
assert(w.minDepth == 12)
}
eventListener.expectNoMessage(100 millis)

// The splice of channel ab is announced.
Expand All @@ -1128,7 +1134,7 @@ class RouterSpec extends BaseRouterSpec {
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, spliceAnn))
peerConnection.expectNoMessage(100 millis)
assert(watcher.expectMsgType[ValidateRequest].ann == spliceAnn)
watcher.send(router, ValidateResult(spliceAnn, Right(spliceTx(funding_a, funding_b, newCapacity1), UtxoStatus.Unspent)))
watcher.send(router, ValidateResult(spliceAnn, Right(spliceTx2, UtxoStatus.Unspent)))
peerConnection.expectMsg(TransportHandler.ReadAck(spliceAnn))
peerConnection.expectMsg(GossipDecision.Accepted(spliceAnn))
assert(peerConnection.sender() == router)
Expand All @@ -1141,24 +1147,23 @@ class RouterSpec extends BaseRouterSpec {
val edge_ba = g.getEdge(ChannelDesc(spliceScid, b, a)).get
assert(g.getEdge(ChannelDesc(scid_ab, a, b)).isEmpty)
assert(g.getEdge(ChannelDesc(scid_ab, b, a)).isEmpty)
assert(edge_ab.capacity == newCapacity1 && edge_ba.capacity == newCapacity1)
assert(edge_ab.capacity == capacity2 && edge_ba.capacity == capacity2)

// The channel update for the splice is confirmed and the channel is not removed.
router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_a, funding_b))
eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(spliceAnn, newCapacity1, None, None) :: Nil))
eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(spliceAnn, capacity2, None, None) :: Nil))
eventListener.expectMsg(ChannelLost(scid_ab))
peerConnection.expectNoMessage(100 millis)
eventListener.expectNoMessage(100 millis)

// The router no longer tracks the parent scid
// The router no longer tracks the parent scid.
val probe = TestProbe()
awaitAssert({
probe.send(router, GetRouterData)
val routerData = probe.expectMsgType[Data]
assert(routerData.spentChannels.isEmpty)
assert(!routerData.channels.contains(scid_ab))
})

}


}

0 comments on commit 0f250b1

Please sign in to comment.