diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala index feb6c787f8..766750e76d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala @@ -120,10 +120,9 @@ object ZmqWatcher { def spendingTx: Transaction } - case class WatchExternalChannelSpent(replyTo: ActorRef[WatchExternalChannelSpentTriggered], txId: TxId, outputIndex: Int, shortChannelId: RealShortChannelId) extends WatchSpent[WatchExternalChannelSpentTriggered] { - override def hints: Set[TxId] = Set.empty - } + case class WatchExternalChannelSpent(replyTo: ActorRef[WatchExternalChannelSpentTriggered], txId: TxId, outputIndex: Int, shortChannelId: RealShortChannelId) extends WatchSpent[WatchExternalChannelSpentTriggered] { override def hints: Set[TxId] = Set.empty } case class WatchExternalChannelSpentTriggered(shortChannelId: RealShortChannelId, spendingTx: Transaction) extends WatchSpentTriggered + case class UnwatchExternalChannelSpent(txId: TxId, outputIndex: Int) extends Command case class WatchFundingSpent(replyTo: ActorRef[WatchFundingSpentTriggered], txId: TxId, outputIndex: Int, hints: Set[TxId]) extends WatchSpent[WatchFundingSpentTriggered] case class WatchFundingSpentTriggered(spendingTx: Transaction) extends WatchSpentTriggered @@ -356,6 +355,11 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client } watching(watches -- deprecatedWatches, watchedUtxos) + case UnwatchExternalChannelSpent(txId, outputIndex) => + val deprecatedWatches = watches.keySet.collect { case w: WatchExternalChannelSpent if w.txId == txId && w.outputIndex == outputIndex => w } + val watchedUtxos1 = deprecatedWatches.foldLeft(watchedUtxos) { case (m, w) => removeWatchedUtxos(m, w) } + watching(watches -- deprecatedWatches, watchedUtxos1) + case ValidateRequest(replyTo, ann) => client.validate(ann).map(replyTo ! _) Behaviors.same diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index 811ca13bc9..c0a905101b 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -25,9 +25,8 @@ import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.bitcoin.scalacompat.{BlockHash, ByteVector32, Satoshi, TxId} import fr.acinq.eclair.Logs.LogCategory import fr.acinq.eclair._ -import fr.acinq.eclair.blockchain.CurrentBlockHeight import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{ValidateResult, WatchExternalChannelSpent, WatchExternalChannelSpentTriggered, WatchTxConfirmed, WatchTxConfirmedTriggered} +import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._ import fr.acinq.eclair.channel._ import fr.acinq.eclair.channel.fsm.Channel.ANNOUNCEMENTS_MINCONF import fr.acinq.eclair.crypto.TransportHandler @@ -263,14 +262,14 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm stay() using Validation.handleChannelValidationResponse(d, nodeParams, watcher, r) case Event(WatchExternalChannelSpentTriggered(shortChannelId, spendingTx), d) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) => - val txId = d.channels.getOrElse(shortChannelId, d.prunedChannels(shortChannelId)).fundingTxId - log.info("funding tx txId={} of channelId={} has been spent by txId={}: waiting for the spending tx to have enough confirmations before removing the channel from the graph", txId, shortChannelId, spendingTx.txid) + val fundingTxId = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get.fundingTxId + log.info("funding tx txId={} of channelId={} has been spent by txId={}: waiting for the spending tx to have enough confirmations before removing the channel from the graph", fundingTxId, shortChannelId, spendingTx.txid) watcher ! WatchTxConfirmed(self, spendingTx.txid, ANNOUNCEMENTS_MINCONF * 2) stay() using d.copy(spentChannels = d.spentChannels + (spendingTx.txid -> shortChannelId)) case Event(WatchTxConfirmedTriggered(_, _, spendingTx), d) => d.spentChannels.get(spendingTx.txid) match { - case Some(shortChannelId) => stay() using Validation.handleChannelSpent(d, nodeParams.db.network, shortChannelId) + case Some(shortChannelId) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, shortChannelId) case None => stay() } @@ -585,7 +584,6 @@ object Router { def +(ignoreNode: PublicKey): Ignore = copy(nodes = nodes + ignoreNode) def ++(ignoreNodes: Set[PublicKey]): Ignore = copy(nodes = nodes ++ ignoreNodes) def +(ignoreChannel: ChannelDesc): Ignore = copy(channels = channels + ignoreChannel) - def emptyNodes(): Ignore = copy(nodes = Set.empty) def emptyChannels(): Ignore = copy(channels = Set.empty) // @formatter:on } @@ -634,12 +632,6 @@ object Router { /** Full route including the final hop, if any. */ val fullRoute: Seq[Hop] = hops ++ finalHop_opt.toSeq - /** - * Fee paid for the trampoline hop, if any. - * Note that when using MPP to reach the trampoline node, the trampoline fee must be counted only once. - */ - val trampolineFee: MilliSatoshi = finalHop_opt.collect { case hop: NodeHop => hop.fee(amount) }.getOrElse(0 msat) - /** * Fee paid for the blinded route, if any. * Note that when we are the introduction node for the blinded route, we cannot easily compute the fee without the diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala index b1ba01492f..6556eb0aef 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala @@ -24,7 +24,7 @@ import fr.acinq.bitcoin.scalacompat.Script.{pay2wsh, write} import fr.acinq.bitcoin.scalacompat.{Satoshi, TxId} import fr.acinq.eclair.ShortChannelId.outputIndex import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{UtxoStatus, ValidateRequest, ValidateResult, WatchExternalChannelSpent} +import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._ import fr.acinq.eclair.channel._ import fr.acinq.eclair.crypto.TransportHandler import fr.acinq.eclair.db.NetworkDb @@ -33,7 +33,7 @@ import fr.acinq.eclair.router.Monitoring.Metrics import fr.acinq.eclair.router.Router._ import fr.acinq.eclair.transactions.Scripts import fr.acinq.eclair.wire.protocol._ -import fr.acinq.eclair.{BlockHeight, Logs, MilliSatoshiLong, NodeParams, RealShortChannelId, ShortChannelId, TimestampSecond, TxCoordinates} +import fr.acinq.eclair.{BlockHeight, Logs, MilliSatoshiLong, NodeParams, RealShortChannelId, ShortChannelId, TxCoordinates} object Validation { @@ -120,8 +120,10 @@ object Validation { Some(updateSplicedPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, parentChannel)) case None => log.error("spent parent channel shortChannelId={} not found for splice shortChannelId={}", parentScid, c.shortChannelId) - val spentChannels1 = d0.spentChannels.filter(_._2 != parentScid) - Some(addPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, None).copy(spentChannels = spentChannels1)) + val spendingTxs = d0.spentChannels.filter(_._2 == parentScid).keySet + spendingTxs.foreach(txId => watcher ! UnwatchTxConfirmed(txId)) + val d1 = d0.copy(spentChannels = d0.spentChannels -- spendingTxs) + Some(addPublicChannel(d1, nodeParams, watcher, c, tx.txid, capacity, None)) } case None => Some(addPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, None)) } @@ -171,6 +173,7 @@ object Validation { implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors val fundingOutputIndex = outputIndex(ann.shortChannelId) watcher ! WatchExternalChannelSpent(ctx.self, spliceTxId, fundingOutputIndex, ann.shortChannelId) + watcher ! UnwatchExternalChannelSpent(parentChannel.fundingTxId, outputIndex(parentChannel.ann.shortChannelId)) // we notify front nodes that the channel has been replaced ctx.system.eventStream.publish(ChannelsDiscovered(SingleChannelDiscovered(ann, capacity, None, None) :: Nil)) ctx.system.eventStream.publish(ChannelLost(parentChannel.shortChannelId)) @@ -180,15 +183,17 @@ object Validation { ann = ann, fundingTxId = spliceTxId, capacity = capacity, - // update the timestamps of the channel updates to ensure the spliced channel is not pruned - update_1_opt = parentChannel.update_1_opt.map(_.copy(shortChannelId = ann.shortChannelId, timestamp = TimestampSecond.now())), - update_2_opt = parentChannel.update_2_opt.map(_.copy(shortChannelId = ann.shortChannelId, timestamp = TimestampSecond.now())), + // we keep the previous channel updates to ensure that the channel is still used until we receive the new ones + update_1_opt = parentChannel.update_1_opt, + update_2_opt = parentChannel.update_2_opt, ) log.debug("replacing parent channel scid={} with splice channel scid={}; splice channel={}", parentChannel.shortChannelId, ann.shortChannelId, newPubChan) // we need to update the graph because the edge identifiers and capacity change from the parent scid to the new splice scid log.debug("updating the graph for shortChannelId={}", newPubChan.shortChannelId) val graph1 = d.graphWithBalances.updateChannel(ChannelDesc(parentChannel.shortChannelId, parentChannel.nodeId1, parentChannel.nodeId2), ann.shortChannelId, capacity) - val spentChannels1 = d.spentChannels.filter(_._2 != parentChannel.shortChannelId) + val spendingTxs = d.spentChannels.filter(_._2 == parentChannel.shortChannelId).keySet + spendingTxs.foreach(txId => watcher ! UnwatchTxConfirmed(txId)) + val spentChannels1 = d.spentChannels -- spendingTxs d.copy( // we also add the splice scid -> channelId and remove the parent scid -> channelId mappings channels = d.channels + (newPubChan.shortChannelId -> newPubChan) - parentChannel.shortChannelId, @@ -262,9 +267,9 @@ object Validation { } else d1 } - def handleChannelSpent(d: Data, db: NetworkDb, shortChannelId: RealShortChannelId)(implicit ctx: ActorContext, log: LoggingAdapter): Data = { + def handleChannelSpent(d: Data, watcher: typed.ActorRef[ZmqWatcher.Command], db: NetworkDb, shortChannelId: RealShortChannelId)(implicit ctx: ActorContext, log: LoggingAdapter): Data = { implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors - val lostChannel = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get.ann + val lostChannel = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get log.info("funding tx for channelId={} was spent", shortChannelId) // we need to remove nodes that aren't tied to any channels anymore val channels1 = d.channels - shortChannelId @@ -287,7 +292,10 @@ object Validation { // we no longer need to track this or alternative transactions that spent the parent channel // either this channel was really closed, or it was spliced and the announcement was not received in time // we will re-add a spliced channel as a new channel later when we receive the announcement - val spentChannels1 = d.spentChannels.filter(_._2 != shortChannelId) + watcher ! UnwatchExternalChannelSpent(lostChannel.fundingTxId, outputIndex(lostChannel.ann.shortChannelId)) + val spendingTxs = d.spentChannels.filter(_._2 == shortChannelId).keySet + spendingTxs.foreach(txId => watcher ! UnwatchTxConfirmed(txId)) + val spentChannels1 = d.spentChannels -- spendingTxs d.copy(nodes = d.nodes -- lostNodes, channels = channels1, prunedChannels = prunedChannels1, graphWithBalances = graphWithBalances1, spentChannels = spentChannels1) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala index 4f08fad488..18adb2d1e0 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala @@ -350,6 +350,54 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind }) } + test("unwatch external channel") { + withWatcher(f => { + import f._ + + val (priv, address) = createExternalAddress() + val tx1 = sendToAddress(address, 250_000 sat, probe) + val outputIndex1 = tx1.txOut.indexWhere(_.publicKeyScript == Script.write(Script.pay2wpkh(priv.publicKey))) + val spendingTx1 = createSpendP2WPKH(tx1, priv, priv.publicKey, 500 sat, 0, 0) + val tx2 = sendToAddress(address, 200_000 sat, probe) + val outputIndex2 = tx2.txOut.indexWhere(_.publicKeyScript == Script.write(Script.pay2wpkh(priv.publicKey))) + val spendingTx2 = createSpendP2WPKH(tx2, priv, priv.publicKey, 500 sat, 0, 0) + + watcher ! WatchExternalChannelSpent(probe.ref, tx1.txid, outputIndex1, RealShortChannelId(3)) + watcher ! WatchExternalChannelSpent(probe.ref, tx2.txid, outputIndex2, RealShortChannelId(5)) + watcher ! UnwatchExternalChannelSpent(tx1.txid, outputIndex1 + 1) // ignored + watcher ! UnwatchExternalChannelSpent(randomTxId(), outputIndex1) // ignored + + // When publishing the transaction, the watch triggers immediately. + bitcoinClient.publishTransaction(spendingTx1) + probe.expectMsg(WatchExternalChannelSpentTriggered(RealShortChannelId(3), spendingTx1)) + probe.expectNoMessage(100 millis) + + // If we unwatch the transaction, we will ignore when it's published. + watcher ! UnwatchExternalChannelSpent(tx2.txid, outputIndex2) + bitcoinClient.publishTransaction(spendingTx2) + probe.expectNoMessage(100 millis) + + // If we watch again, this will trigger immediately because the transaction is in the mempool. + watcher ! WatchExternalChannelSpent(probe.ref, tx2.txid, outputIndex2, RealShortChannelId(5)) + probe.expectMsg(WatchExternalChannelSpentTriggered(RealShortChannelId(5), spendingTx2)) + probe.expectNoMessage(100 millis) + + // We make the transactions confirm while we're not watching. + watcher ! UnwatchExternalChannelSpent(tx1.txid, outputIndex1) + watcher ! UnwatchExternalChannelSpent(tx2.txid, outputIndex2) + bitcoinClient.getBlockHeight().pipeTo(probe.ref) + val initialBlockHeight = probe.expectMsgType[BlockHeight] + system.eventStream.subscribe(probe.ref, classOf[CurrentBlockHeight]) + generateBlocks(1) + awaitCond(probe.expectMsgType[CurrentBlockHeight].blockHeight >= initialBlockHeight + 1) + + // If we watch again after confirmation, the watch instantly triggers. + watcher ! WatchExternalChannelSpent(probe.ref, tx1.txid, outputIndex1, RealShortChannelId(3)) + probe.expectMsg(WatchExternalChannelSpentTriggered(RealShortChannelId(3), spendingTx1)) + probe.expectNoMessage(100 millis) + }) + } + test("watch for unknown spent transactions") { withWatcher(f => { import f._ diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala index a07c49bd0a..e4978e5b6e 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala @@ -39,6 +39,7 @@ import fr.acinq.eclair.router.Router._ import fr.acinq.eclair.transactions.Scripts import fr.acinq.eclair.wire.protocol._ import fr.acinq.eclair.{BlockHeight, CltvExpiryDelta, Features, MilliSatoshi, MilliSatoshiLong, RealShortChannelId, ShortChannelId, TestConstants, TimestampSecond, randomBytes32, randomKey} +import org.scalatest.Inside.inside import scodec.bits._ import scala.concurrent.duration._ @@ -318,9 +319,11 @@ class RouterSpec extends BaseRouterSpec { probe.expectMsg(PublicNode(node_b, 2, publicChannelCapacity * 2)) } - def spendingTx(node1: PublicKey, node2: PublicKey): Transaction = { - val originalFundingTx = Transaction(version = 0, txIn = Nil, txOut = TxOut(publicChannelCapacity, write(pay2wsh(Scripts.multiSig2of2(node1, node2)))) :: Nil, lockTime = 0) - Transaction(version = 0, txIn = TxIn(outPoint = OutPoint(originalFundingTx,0), signatureScript = write(pay2wsh(Scripts.multiSig2of2(node1, node2))), sequence = 0) :: Nil, txOut = TxOut(publicChannelCapacity, write(pay2wsh(Scripts.multiSig2of2(node1, node1)))) :: Nil, lockTime = 0) + def spendingTx(node1: PublicKey, node2: PublicKey, capacity: Satoshi = publicChannelCapacity): Transaction = { + val fundingScript = write(pay2wsh(Scripts.multiSig2of2(node1, node2))) + val previousFundingTx = Transaction(version = 2, txIn = Nil, txOut = TxOut(capacity, fundingScript) :: Nil, lockTime = 0) + val nextFundingTx = Transaction(version = 0, txIn = TxIn(OutPoint(previousFundingTx, 0), fundingScript, 0) :: Nil, txOut = TxOut(capacity, fundingScript) :: Nil, lockTime = 0) + nextFundingTx } test("properly announce lost channels and nodes") { fixture => @@ -1098,11 +1101,6 @@ class RouterSpec extends BaseRouterSpec { } } - def spliceTx(node1: PublicKey, node2: PublicKey, newCapacity: Satoshi): Transaction = { - val originalFundingTx = Transaction(version = 0, txIn = Nil, txOut = TxOut(publicChannelCapacity, write(pay2wsh(Scripts.multiSig2of2(node1, node2)))) :: Nil, lockTime = 0) - Transaction(version = 0, txIn = TxIn(outPoint = OutPoint(originalFundingTx,0), signatureScript = write(pay2wsh(Scripts.multiSig2of2(node1, node2))), sequence = 0) :: Nil, txOut = TxOut(newCapacity, write(pay2wsh(Scripts.multiSig2of2(node1, node2)))) :: Nil, lockTime = 0) - } - test("update an existing channel after a splice") { fixture => import fixture._ @@ -1111,15 +1109,23 @@ class RouterSpec extends BaseRouterSpec { val peerConnection = TestProbe() // Channel ab is spent by a splice tx. - val newCapacity = publicChannelCapacity - 100_000.sat - router ! WatchExternalChannelSpentTriggered(scid_ab, spliceTx(funding_a, funding_b, newCapacity)) - assert(watcher.expectMsgType[WatchTxConfirmed].minDepth == 12) + val capacity1 = publicChannelCapacity - 100_000.sat + val spliceTx1 = spendingTx(funding_a, funding_b, capacity1) + router ! WatchExternalChannelSpentTriggered(scid_ab, spliceTx1) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == spliceTx1.txid) + assert(w.minDepth == 12) + } eventListener.expectNoMessage(100 millis) // Channel ab is spent and confirmed by an RBF of splice tx. - val newCapacity1 = publicChannelCapacity - 100_000.sat - 1000.sat - router ! WatchExternalChannelSpentTriggered(scid_ab, spliceTx(funding_a, funding_b, newCapacity1)) - assert(watcher.expectMsgType[WatchTxConfirmed].minDepth == 12) + val capacity2 = publicChannelCapacity - 100_000.sat - 1000.sat + val spliceTx2 = spendingTx(funding_a, funding_b, capacity2) + router ! WatchExternalChannelSpentTriggered(scid_ab, spliceTx2) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == spliceTx2.txid) + assert(w.minDepth == 12) + } eventListener.expectNoMessage(100 millis) // The splice of channel ab is announced. @@ -1128,7 +1134,7 @@ class RouterSpec extends BaseRouterSpec { peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, spliceAnn)) peerConnection.expectNoMessage(100 millis) assert(watcher.expectMsgType[ValidateRequest].ann == spliceAnn) - watcher.send(router, ValidateResult(spliceAnn, Right(spliceTx(funding_a, funding_b, newCapacity1), UtxoStatus.Unspent))) + watcher.send(router, ValidateResult(spliceAnn, Right(spliceTx2, UtxoStatus.Unspent))) peerConnection.expectMsg(TransportHandler.ReadAck(spliceAnn)) peerConnection.expectMsg(GossipDecision.Accepted(spliceAnn)) assert(peerConnection.sender() == router) @@ -1141,24 +1147,23 @@ class RouterSpec extends BaseRouterSpec { val edge_ba = g.getEdge(ChannelDesc(spliceScid, b, a)).get assert(g.getEdge(ChannelDesc(scid_ab, a, b)).isEmpty) assert(g.getEdge(ChannelDesc(scid_ab, b, a)).isEmpty) - assert(edge_ab.capacity == newCapacity1 && edge_ba.capacity == newCapacity1) + assert(edge_ab.capacity == capacity2 && edge_ba.capacity == capacity2) // The channel update for the splice is confirmed and the channel is not removed. router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_a, funding_b)) - eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(spliceAnn, newCapacity1, None, None) :: Nil)) + eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(spliceAnn, capacity2, None, None) :: Nil)) eventListener.expectMsg(ChannelLost(scid_ab)) peerConnection.expectNoMessage(100 millis) eventListener.expectNoMessage(100 millis) - // The router no longer tracks the parent scid + // The router no longer tracks the parent scid. val probe = TestProbe() awaitAssert({ probe.send(router, GetRouterData) val routerData = probe.expectMsgType[Data] assert(routerData.spentChannels.isEmpty) + assert(!routerData.channels.contains(scid_ab)) }) - } - }