Skip to content

Commit 5abf99e

Browse files
authored
Add router support for batched splices (#2989)
A splice tx could involve more than one parent channel. The Router must track the set of channels spent by a given spend tx until either: 1) If it is a splice, then matching channel announcements are received for each parent and the channels are updated in the routing graph. 2) If the spend tx is deeply buried without receiving matching channel announcements for a parent channel, then it can be removed from the routing graph. If a splice tx spends more than one parent channel between the same nodes, then there's no way to know which new channel announcement corresponds to which parent channel. We simply update the first one found.
1 parent b6aa4cc commit 5abf99e

File tree

5 files changed

+307
-68
lines changed

5 files changed

+307
-68
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -264,11 +264,11 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
264264
val fundingTxId = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get.fundingTxId
265265
log.info("funding tx txId={} of channelId={} has been spent by txId={}: waiting for the spending tx to have enough confirmations before removing the channel from the graph", fundingTxId, shortChannelId, spendingTx.txid)
266266
watcher ! WatchTxConfirmed(self, spendingTx.txid, ANNOUNCEMENTS_MINCONF * 2)
267-
stay() using d.copy(spentChannels = d.spentChannels + (spendingTx.txid -> shortChannelId))
267+
stay() using d.copy(spentChannels = d.spentChannels.updated(spendingTx.txid, d.spentChannels.getOrElse(spendingTx.txid, Set.empty) + shortChannelId))
268268

269269
case Event(WatchTxConfirmedTriggered(_, _, spendingTx), d) =>
270270
d.spentChannels.get(spendingTx.txid) match {
271-
case Some(shortChannelId) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, spendingTx.txid, shortChannelId)
271+
case Some(shortChannelIds) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, spendingTx.txid, shortChannelIds)
272272
case None => stay()
273273
}
274274

@@ -771,7 +771,7 @@ object Router {
771771
excludedChannels: Map[ChannelDesc, ExcludedChannelStatus], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure
772772
graphWithBalances: GraphWithBalanceEstimates,
773773
sync: Map[PublicKey, Syncing], // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
774-
spentChannels: Map[TxId, RealShortChannelId], // transactions that spend funding txs that are not yet deeply buried
774+
spentChannels: Map[TxId, Set[RealShortChannelId]], // transactions that spend funding txs that are not yet deeply buried
775775
) {
776776
def resolve(scid: ShortChannelId): Option[KnownChannel] = {
777777
// let's assume this is a real scid

eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala

Lines changed: 49 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -113,18 +113,16 @@ object Validation {
113113
log.debug("validation successful for shortChannelId={}", c.shortChannelId)
114114
remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c)))
115115
val capacity = tx.txOut(outputIndex).amount
116-
d0.spentChannels.get(tx.txid) match {
117-
case Some(parentScid) =>
118-
d0.channels.get(parentScid) match {
119-
case Some(parentChannel) =>
120-
Some(updateSplicedPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, parentChannel))
121-
case None =>
122-
log.error("spent parent channel shortChannelId={} not found for splice shortChannelId={}", parentScid, c.shortChannelId)
123-
val spendingTxs = d0.spentChannels.filter(_._2 == parentScid).keySet
124-
spendingTxs.foreach(txId => watcher ! UnwatchTxConfirmed(txId))
125-
val d1 = d0.copy(spentChannels = d0.spentChannels -- spendingTxs)
126-
Some(addPublicChannel(d1, nodeParams, watcher, c, tx.txid, capacity, None))
127-
}
116+
// A single transaction may splice multiple channels (batching), in which case we have multiple parent
117+
// channels. We cannot know which parent channel this announcement corresponds to, but it doesn't matter.
118+
// We only need to update one of the parent channels between the same set of nodes to correctly update
119+
// our graph.
120+
val parentChannel_opt = d0.spentChannels
121+
.getOrElse(tx.txid, Set.empty)
122+
.flatMap(d0.channels.get)
123+
.find(parent => parent.nodeId1 == c.nodeId1 && parent.nodeId2 == c.nodeId2)
124+
parentChannel_opt match {
125+
case Some(parentChannel) => Some(updateSplicedPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, parentChannel))
128126
case None => Some(addPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, None))
129127
}
130128
}
@@ -191,9 +189,12 @@ object Validation {
191189
// we need to update the graph because the edge identifiers and capacity change from the parent scid to the new splice scid
192190
log.debug("updating the graph for shortChannelId={}", newPubChan.shortChannelId)
193191
val graph1 = d.graphWithBalances.updateChannel(ChannelDesc(parentChannel.shortChannelId, parentChannel.nodeId1, parentChannel.nodeId2), ann.shortChannelId, capacity)
194-
val spendingTxs = d.spentChannels.filter(_._2 == parentChannel.shortChannelId).keySet
195-
spendingTxs.foreach(txId => watcher ! UnwatchTxConfirmed(txId))
196-
val spentChannels1 = d.spentChannels -- spendingTxs
192+
val spentChannels1 = d.spentChannels.collect {
193+
case (txId, parentScids) if (parentScids - parentChannel.shortChannelId).nonEmpty =>
194+
txId -> (parentScids - parentChannel.shortChannelId)
195+
}
196+
// No need to keep watching transactions that have been removed from spentChannels.
197+
(d.spentChannels.keySet -- spentChannels1.keys).foreach(txId => watcher ! UnwatchTxConfirmed(txId))
197198
d.copy(
198199
// we also add the splice scid -> channelId and remove the parent scid -> channelId mappings
199200
channels = d.channels + (newPubChan.shortChannelId -> newPubChan) - parentChannel.shortChannelId,
@@ -267,34 +268,42 @@ object Validation {
267268
} else d1
268269
}
269270

270-
def handleChannelSpent(d: Data, watcher: typed.ActorRef[ZmqWatcher.Command], db: NetworkDb, spendingTxId: TxId, shortChannelId: RealShortChannelId)(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
271+
def handleChannelSpent(d: Data, watcher: typed.ActorRef[ZmqWatcher.Command], db: NetworkDb, spendingTxId: TxId, shortChannelIds: Set[RealShortChannelId])(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
271272
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
272-
val lostChannel = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get
273-
log.info("funding tx for channelId={} was spent", shortChannelId)
273+
val lostChannels = shortChannelIds.flatMap(shortChannelId => d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)))
274+
log.info("funding tx for channelIds={} was spent", shortChannelIds.mkString(","))
274275
// we need to remove nodes that aren't tied to any channels anymore
275-
val channels1 = d.channels - shortChannelId
276-
val prunedChannels1 = d.prunedChannels - shortChannelId
277-
val lostNodes = Seq(lostChannel.nodeId1, lostChannel.nodeId2).filterNot(nodeId => hasChannels(nodeId, channels1.values))
276+
val channels1 = d.channels -- shortChannelIds
277+
val prunedChannels1 = d.prunedChannels -- shortChannelIds
278+
val lostNodes = lostChannels.flatMap(lostChannel => Seq(lostChannel.nodeId1, lostChannel.nodeId2).filterNot(nodeId => hasChannels(nodeId, channels1.values)))
278279
// let's clean the db and send the events
279-
log.info("pruning shortChannelId={} (spent)", shortChannelId)
280-
db.removeChannel(shortChannelId) // NB: this also removes channel updates
280+
log.info("pruning shortChannelIds={} (spent)", shortChannelIds.mkString(","))
281+
shortChannelIds.foreach(db.removeChannel(_)) // NB: this also removes channel updates
281282
// we also need to remove updates from the graph
282-
val graphWithBalances1 = d.graphWithBalances
283-
.removeChannel(ChannelDesc(lostChannel.shortChannelId, lostChannel.nodeId1, lostChannel.nodeId2))
283+
val graphWithBalances1 = lostChannels.foldLeft(d.graphWithBalances) { (graph, lostChannel) =>
284+
graph.removeChannel(ChannelDesc(lostChannel.shortChannelId, lostChannel.nodeId1, lostChannel.nodeId2))
285+
}
284286
// we notify front nodes
285-
ctx.system.eventStream.publish(ChannelLost(shortChannelId))
287+
shortChannelIds.foreach(shortChannelId => ctx.system.eventStream.publish(ChannelLost(shortChannelId)))
286288
lostNodes.foreach {
287289
nodeId =>
288290
log.info("pruning nodeId={} (spent)", nodeId)
289291
db.removeNode(nodeId)
290292
ctx.system.eventStream.publish(NodeLost(nodeId))
291293
}
292-
// we no longer need to track this or alternative transactions that spent the parent channel
293-
// either this channel was really closed, or it was spliced and the announcement was not received in time
294-
// we will re-add a spliced channel as a new channel later when we receive the announcement
295-
watcher ! UnwatchExternalChannelSpent(lostChannel.fundingTxId, outputIndex(lostChannel.ann.shortChannelId))
296-
val spendingTxs = d.spentChannels.filter(_._2 == shortChannelId).keySet
297-
// stop watching the spending txs that will never confirm, we already got confirmations for this spending tx
294+
lostChannels.foreach {
295+
lostChannel =>
296+
// we no longer need to track this or alternative transactions that spent the parent channel
297+
// either this channel was really closed, or it was spliced and the announcement was not received in time
298+
// we will re-add a spliced channel as a new channel later when we receive the announcement
299+
watcher ! UnwatchExternalChannelSpent(lostChannel.fundingTxId, outputIndex(lostChannel.ann.shortChannelId))
300+
}
301+
302+
// We may have received RBF candidates for this splice: we can find them by looking at transactions that spend one
303+
// of the channels we're removing (note that they may spend a slightly different set of channels).
304+
// Those transactions cannot confirm anymore (they have been double-spent by the current one), so we should stop
305+
// watching them.
306+
val spendingTxs = d.spentChannels.filter(_._2.intersect(shortChannelIds).nonEmpty).keySet
298307
(spendingTxs - spendingTxId).foreach(txId => watcher ! UnwatchTxConfirmed(txId))
299308
val spentChannels1 = d.spentChannels -- spendingTxs
300309
d.copy(nodes = d.nodes -- lostNodes, channels = channels1, prunedChannels = prunedChannels1, graphWithBalances = graphWithBalances1, spentChannels = spentChannels1)
@@ -599,7 +608,14 @@ object Validation {
599608
log.debug("this is a known pruned local channel, processing channel_update for channelId={} scid={}", lcu.channelId, ann.shortChannelId)
600609
handleChannelUpdate(d, db, nodeParams.currentBlockHeight, Left(lcu))
601610
case Some(ann) =>
602-
val d1 = d.spentChannels.get(ann.fundingTxId).flatMap(parentScid => d.channels.get(parentScid)) match {
611+
// A single transaction may splice multiple channels (batching), in which case we have multiple parent
612+
// channels. We cannot know which parent channel this announcement corresponds to, but it doesn't matter.
613+
// We only need to update one of the parent channels between the same set of nodes to correctly update
614+
// our graph.
615+
val d1 = d.spentChannels
616+
.getOrElse(ann.fundingTxId, Set.empty)
617+
.flatMap(d.channels.get)
618+
.find(parent => parent.nodeId1 == ann.announcement.nodeId1 && parent.nodeId2 == ann.announcement.nodeId2) match {
603619
case Some(parentChannel) =>
604620
// This is a splice for which we haven't processed the (local) channel_announcement yet.
605621
log.debug("processing channel_announcement for local splice with fundingTxId={} channelId={} scid={} (previous={})", ann.fundingTxId, lcu.channelId, ann.shortChannelId, parentChannel.shortChannelId)

eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/channel/GossipIntegrationSpec.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,20 @@ class GossipIntegrationSpec extends FixtureSpec with IntegrationPatience {
5252
val channels = getPeerChannels(alice, bob.nodeId) ++ getPeerChannels(bob, carol.nodeId)
5353
assert(channels.map(_.data.channelId).toSet == Set(channelId_ab, channelId_bc))
5454

55-
eventually {
55+
val scid_ab = eventually {
5656
assert(getChannelData(alice, channelId_ab).asInstanceOf[DATA_NORMAL].commitments.latest.localFundingStatus.isInstanceOf[LocalFundingStatus.ConfirmedFundingTx])
5757
assert(getChannelData(bob, channelId_bc).asInstanceOf[DATA_NORMAL].commitments.latest.localFundingStatus.isInstanceOf[LocalFundingStatus.ConfirmedFundingTx])
58+
val scid_ab = getChannelData(alice, channelId_ab).asInstanceOf[DATA_NORMAL].commitments.latest.shortChannelId_opt.get
59+
// Wait for Alice to receive both initial local channel updates.
60+
inside(getRouterData(alice)) { routerData =>
61+
val channel_ab = routerData.channels(scid_ab)
62+
val receivedUpdates = Seq(channel_ab.update_1_opt, channel_ab.update_2_opt).flatten
63+
assert(receivedUpdates.count(_.shortChannelId == scid_ab) == 2)
64+
}
65+
scid_ab
5866
}
5967

6068
// We splice in to increase the capacity of the alice->bob channel.
61-
val scid_ab = getChannelData(alice, channelId_ab).asInstanceOf[DATA_NORMAL].commitments.latest.shortChannelId_opt.get
6269
val spliceTxId = spliceIn(alice, channelId_ab, 100_000 sat, None).asInstanceOf[RES_SPLICE].fundingTxId
6370

6471
// The announcement for the splice transaction and the corresponding channel updates are broadcast.
@@ -73,10 +80,12 @@ class GossipIntegrationSpec extends FixtureSpec with IntegrationPatience {
7380
val splice_scid_ab = channelData_alice.commitments.latest.shortChannelId_opt.get
7481
assert(splice_scid_ab != scid_ab)
7582
assert(channelData_bob.commitments.latest.shortChannelId_opt.contains(splice_scid_ab))
83+
val scid_bc = getChannelData(bob, channelId_bc).asInstanceOf[DATA_NORMAL].commitments.latest.shortChannelId_opt.get
7684

7785
// Alice creates a channel_announcement for the splice transaction and updates the graph.
7886
val spliceAnn = inside(getRouterData(alice)) { routerData =>
79-
assert(routerData.channels.contains(splice_scid_ab))
87+
assert(routerData.channels.keys == Set(splice_scid_ab, scid_bc))
88+
assert(routerData.spentChannels.isEmpty)
8089
val channel_ab = routerData.channels(splice_scid_ab)
8190
assert(channel_ab.capacity == 200_000.sat)
8291
assert(channel_ab.update_1_opt.nonEmpty && channel_ab.update_2_opt.nonEmpty)
@@ -91,6 +100,8 @@ class GossipIntegrationSpec extends FixtureSpec with IntegrationPatience {
91100

92101
// Bob also creates a channel_announcement for the splice transaction and updates the graph.
93102
inside(getRouterData(bob)) { routerData =>
103+
assert(routerData.channels.keys == Set(splice_scid_ab, scid_bc))
104+
assert(routerData.spentChannels.isEmpty)
94105
assert(routerData.channels.get(splice_scid_ab).map(_.ann).contains(spliceAnn))
95106
routerData.channels.get(splice_scid_ab).foreach(c => {
96107
assert(c.capacity == 200_000.sat)
@@ -106,6 +117,8 @@ class GossipIntegrationSpec extends FixtureSpec with IntegrationPatience {
106117

107118
// The channel_announcement for the splice propagates to Carol.
108119
inside(getRouterData(carol)) { routerData =>
120+
assert(routerData.channels.keys == Set(splice_scid_ab, scid_bc))
121+
assert(routerData.spentChannels.isEmpty)
109122
assert(routerData.channels.get(splice_scid_ab).map(_.ann).contains(spliceAnn))
110123
routerData.channels.get(splice_scid_ab).foreach(c => {
111124
assert(c.capacity == 200_000.sat)

eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,44 @@ abstract class BaseRouterSpec extends TestKitBaseClass with FixtureAnyFunSuiteLi
231231
}
232232
}
233233

234+
def addChannel(router: ActorRef, watcher: TestProbe, scid: RealShortChannelId, priv1: PrivateKey, priv2: PrivateKey, priv_funding1: PrivateKey, priv_funding2: PrivateKey): (ChannelAnnouncement, ChannelUpdate, ChannelUpdate) = {
235+
val ann = channelAnnouncement(scid, priv1, priv2, priv_funding1, priv_funding2)
236+
val pub1 = priv1.publicKey
237+
val pub2 = priv2.publicKey
238+
val update1 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv1, pub2, scid, CltvExpiryDelta(7), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 10, htlcMaximumMsat = htlcMaximum)
239+
val update2 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv2, pub1, scid, CltvExpiryDelta(7), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 10, htlcMaximumMsat = htlcMaximum)
240+
val pub_funding1 = priv_funding1.publicKey
241+
val pub_funding2 = priv_funding2.publicKey
242+
assert(ChannelDesc(update1, ann) == ChannelDesc(ann.shortChannelId, pub1, pub2))
243+
val sender1 = TestProbe()
244+
val peerConnection = TestProbe()
245+
peerConnection.ignoreMsg { case _: TransportHandler.ReadAck => true }
246+
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, ann))
247+
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update1))
248+
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update2))
249+
assert(watcher.expectMsgType[ValidateRequest].ann == ann)
250+
watcher.send(router, ValidateResult(ann, Right((Transaction(version = 2, txIn = Nil, txOut = TxOut(publicChannelCapacity, write(pay2wsh(Scripts.multiSig2of2(pub_funding1, pub_funding2)))) :: Nil, lockTime = 0), UtxoStatus.Unspent))))
251+
assert(watcher.expectMsgType[WatchExternalChannelSpent].shortChannelId == scid)
252+
peerConnection.expectMsgAllOf(
253+
GossipDecision.Accepted(ann),
254+
GossipDecision.Accepted(update1),
255+
GossipDecision.Accepted(update2)
256+
)
257+
peerConnection.expectNoMessage(100 millis)
258+
awaitCond({
259+
sender1.send(router, GetNodes)
260+
val nodes = sender1.expectMsgType[Iterable[NodeAnnouncement]]
261+
sender1.send(router, GetChannels)
262+
val channels = sender1.expectMsgType[Iterable[ChannelAnnouncement]].toSeq
263+
sender1.send(router, GetChannelUpdates)
264+
val updates = sender1.expectMsgType[Iterable[ChannelUpdate]].toSeq
265+
nodes.exists(_.nodeId == pub1) && nodes.exists(_.nodeId == pub2) &&
266+
channels.contains(ann) &&
267+
updates.contains(update1) && updates.contains(update2)
268+
}, max = 10 seconds, interval = 1 second)
269+
(ann, update1, update2)
270+
}
271+
234272
}
235273

236274
object BaseRouterSpec {

0 commit comments

Comments
 (0)