Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework TxPublisher #1844

Merged
merged 6 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ eclair {
fulfill-safety-before-timeout-blocks = 24
min-final-expiry-delta-blocks = 30 // Bolt 11 invoice's min_final_cltv_expiry; must be strictly greater than fulfill-safety-before-timeout-blocks
max-block-processing-delay = 30 seconds // we add a random delay before processing blocks, capped at this value, to prevent herd effect
max-tx-publish-retry-delay = 60 seconds // we add a random delay before retrying failed transaction publication

fee-base-msat = 1000
fee-proportional-millionths = 100 // fee charged per transferred satoshi in millionths of a satoshi (100 = 0.01%)
Expand Down
2 changes: 2 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
fulfillSafetyBeforeTimeout: CltvExpiryDelta,
minFinalExpiryDelta: CltvExpiryDelta,
maxBlockProcessingDelay: FiniteDuration,
maxTxPublishRetryDelay: FiniteDuration,
htlcMinimum: MilliSatoshi,
toRemoteDelay: CltvExpiryDelta,
maxToLocalDelay: CltvExpiryDelta,
Expand Down Expand Up @@ -339,6 +340,7 @@ object NodeParams extends Logging {
fulfillSafetyBeforeTimeout = fulfillSafetyBeforeTimeout,
minFinalExpiryDelta = minFinalExpiryDelta,
maxBlockProcessingDelay = FiniteDuration(config.getDuration("max-block-processing-delay").getSeconds, TimeUnit.SECONDS),
maxTxPublishRetryDelay = FiniteDuration(config.getDuration("max-tx-publish-retry-delay").getSeconds, TimeUnit.SECONDS),
htlcMinimum = htlcMinimum,
toRemoteDelay = CltvExpiryDelta(config.getInt("to-remote-delay-blocks")),
maxToLocalDelay = CltvExpiryDelta(config.getInt("max-to-local-delay-blocks")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin._
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient.{FundTransactionOptions, FundTransactionResponse, SignTransactionResponse, toSatoshi}
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BitcoinJsonRPCClient, ExtendedBitcoinClient, JsonRPCError}
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BitcoinJsonRPCClient, ExtendedBitcoinClient}
import fr.acinq.eclair.blockchain.fee.{FeeratePerKB, FeeratePerKw}
import fr.acinq.eclair.transactions.Transactions
import grizzled.slf4j.Logging
Expand Down Expand Up @@ -61,7 +61,7 @@ class BitcoinCoreWallet(rpcClient: BitcoinJsonRPCClient)(implicit ec: ExecutionC
val f = signTransaction(tx)
// if signature fails (e.g. because wallet is encrypted) we need to unlock the utxos
f.recoverWith { case _ =>
unlockOutpoints(tx.txIn.map(_.outPoint))
bitcoinClient.unlockOutpoints(tx.txIn.map(_.outPoint))
.recover { case t: Throwable => // no-op, just add a log in case of failure
logger.warn(s"Cannot unlock failed transaction's UTXOs txid=${tx.txid}", t)
t
Expand Down Expand Up @@ -152,41 +152,12 @@ class BitcoinCoreWallet(rpcClient: BitcoinJsonRPCClient)(implicit ec: ExecutionC
}
}

override def rollback(tx: Transaction): Future[Boolean] = unlockOutpoints(tx.txIn.map(_.outPoint)) // we unlock all utxos used by the tx
override def rollback(tx: Transaction): Future[Boolean] = bitcoinClient.unlockOutpoints(tx.txIn.map(_.outPoint)) // we unlock all utxos used by the tx

override def doubleSpent(tx: Transaction): Future[Boolean] = bitcoinClient.doubleSpent(tx)

/**
* @param outPoints outpoints to unlock
* @return true if all outpoints were successfully unlocked, false otherwise
*/
private def unlockOutpoints(outPoints: Seq[OutPoint])(implicit ec: ExecutionContext): Future[Boolean] = {
// we unlock utxos one by one and not as a list as it would fail at the first utxo that is not actually locked and the rest would not be processed
val futures = outPoints
.map(outPoint => Utxo(outPoint.txid, outPoint.index))
.map(utxo => rpcClient
.invoke("lockunspent", true, List(utxo))
.mapTo[JBool]
.transformWith {
case Success(JBool(result)) => Future.successful(result)
case Failure(JsonRPCError(error)) if error.message.contains("expected locked output") =>
Future.successful(true) // we consider that the outpoint was successfully unlocked (since it was not locked to begin with)
case Failure(t) =>
logger.warn(s"Cannot unlock utxo=$utxo", t)
Future.successful(false)
})
val future = Future.sequence(futures)
// return true if all outpoints were unlocked false otherwise
future.map(_.forall(b => b))
}

}

object BitcoinCoreWallet {

// @formatter:off
case class Utxo(txid: ByteVector32, vout: Long)
case class WalletTransaction(address: String, amount: Satoshi, fees: Satoshi, blockHash: ByteVector32, confirmations: Long, txid: ByteVector32, timestamp: Long)
// @formatter:on

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import fr.acinq.eclair.blockchain.Monitoring.Metrics
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient
import fr.acinq.eclair.blockchain.watchdogs.BlockchainWatchdog
import fr.acinq.eclair.channel.TxPublisher.PublishTx
import fr.acinq.eclair.wire.protocol.ChannelAnnouncement
import fr.acinq.eclair.{KamonExt, ShortChannelId}
import org.json4s.JsonAST._
Expand Down Expand Up @@ -149,8 +148,8 @@ object ZmqWatcher {
case class WatchTxConfirmed(replyTo: ActorRef[WatchTxConfirmedTriggered], txId: ByteVector32, minDepth: Long) extends WatchConfirmed[WatchTxConfirmedTriggered]
case class WatchTxConfirmedTriggered(blockHeight: Int, txIndex: Int, tx: Transaction) extends WatchConfirmedTriggered

case class WatchParentTxConfirmed(replyTo: ActorRef[WatchParentTxConfirmedTriggered], txId: ByteVector32, minDepth: Long, childTx: PublishTx) extends WatchConfirmed[WatchParentTxConfirmedTriggered]
case class WatchParentTxConfirmedTriggered(blockHeight: Int, txIndex: Int, tx: Transaction, childTx: PublishTx) extends WatchConfirmedTriggered
case class WatchParentTxConfirmed(replyTo: ActorRef[WatchParentTxConfirmedTriggered], txId: ByteVector32, minDepth: Long) extends WatchConfirmed[WatchParentTxConfirmedTriggered]
case class WatchParentTxConfirmedTriggered(blockHeight: Int, txIndex: Int, tx: Transaction) extends WatchConfirmedTriggered

// TODO: not implemented yet: notify me if confirmation number gets below minDepth?
case class WatchFundingLost(replyTo: ActorRef[WatchFundingLostTriggered], txId: ByteVector32, minDepth: Long) extends Watch[WatchFundingLostTriggered]
Expand Down Expand Up @@ -378,7 +377,7 @@ private class ZmqWatcher(chainHash: ByteVector32, blockCount: AtomicLong, client
case w: WatchFundingConfirmed => context.self ! TriggerEvent(w.replyTo, w, WatchFundingConfirmedTriggered(height, index, tx))
case w: WatchFundingDeeplyBuried => context.self ! TriggerEvent(w.replyTo, w, WatchFundingDeeplyBuriedTriggered(height, index, tx))
case w: WatchTxConfirmed => context.self ! TriggerEvent(w.replyTo, w, WatchTxConfirmedTriggered(height, index, tx))
case w: WatchParentTxConfirmed => context.self ! TriggerEvent(w.replyTo, w, WatchParentTxConfirmedTriggered(height, index, tx, w.childTx))
case w: WatchParentTxConfirmed => context.self ! TriggerEvent(w.replyTo, w, WatchParentTxConfirmedTriggered(height, index, tx))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{GetTxWithMetaResponse, Ut
import fr.acinq.eclair.blockchain.fee.{FeeratePerKB, FeeratePerKw}
import fr.acinq.eclair.transactions.Transactions
import fr.acinq.eclair.wire.protocol.ChannelAnnouncement
import grizzled.slf4j.Logging
import org.json4s.Formats
import org.json4s.JsonAST._
import scodec.bits.ByteVector

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
import scala.util.{Failure, Success, Try}

/**
* Created by PM on 26/04/2016.
Expand All @@ -39,7 +40,7 @@ import scala.util.Try
* Note that all wallet utilities (signing transactions, setting fees, locking outputs, etc) can be found in
* [[fr.acinq.eclair.blockchain.bitcoind.BitcoinCoreWallet]].
*/
class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) {
class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) extends Logging {

import ExtendedBitcoinClient._

Expand Down Expand Up @@ -151,6 +152,30 @@ class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) {
getRawTransaction(tx.txid).map(_ => tx.txid).recoverWith { case _ => Future.failed(e) }
}

/**
* @param outPoints outpoints to unlock.
* @return true if all outpoints were successfully unlocked, false otherwise.
*/
def unlockOutpoints(outPoints: Seq[OutPoint])(implicit ec: ExecutionContext): Future[Boolean] = {
// we unlock utxos one by one and not as a list as it would fail at the first utxo that is not actually locked and the rest would not be processed
val futures = outPoints
.map(outPoint => Utxo(outPoint.txid, outPoint.index))
.map(utxo => rpcClient
.invoke("lockunspent", true, List(utxo))
.mapTo[JBool]
.transformWith {
case Success(JBool(result)) => Future.successful(result)
case Failure(JsonRPCError(error)) if error.message.contains("expected locked output") =>
Future.successful(true) // we consider that the outpoint was successfully unlocked (since it was not locked to begin with)
case Failure(t) =>
logger.warn(s"cannot unlock utxo=$utxo:", t)
Future.successful(false)
})
val future = Future.sequence(futures)
// return true if all outpoints were unlocked false otherwise
future.map(_.forall(b => b))
}

def isTransactionOutputSpendable(txid: ByteVector32, outputIndex: Int, includeMempool: Boolean)(implicit ec: ExecutionContext): Future[Boolean] =
for {
json <- rpcClient.invoke("gettxout", txid, outputIndex, includeMempool)
Expand Down Expand Up @@ -206,8 +231,9 @@ class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) {
def getMempool()(implicit ec: ExecutionContext): Future[Seq[Transaction]] =
for {
txids <- rpcClient.invoke("getrawmempool").map(json => json.extract[List[String]].map(ByteVector32.fromValidHex))
txs <- Future.sequence(txids.map(getTransaction(_)))
} yield txs
// NB: if a transaction is evicted before we've called getTransaction, we need to ignore it instead of failing.
txs <- Future.sequence(txids.map(getTransaction(_).map(Some(_)).recover { case _ => None }))
} yield txs.flatten

def getMempoolTx(txid: ByteVector32)(implicit ec: ExecutionContext): Future[MempoolTx] = {
rpcClient.invoke("getmempoolentry", txid).map(json => {
Expand Down Expand Up @@ -296,6 +322,8 @@ object ExtendedBitcoinClient {
*/
case class MempoolTx(txid: ByteVector32, vsize: Long, weight: Long, replaceable: Boolean, fees: Satoshi, ancestorCount: Int, ancestorFees: Satoshi, descendantCount: Int, descendantFees: Satoshi)

case class Utxo(txid: ByteVector32, vout: Long)

def toSatoshi(btcAmount: BigDecimal): Satoshi = Satoshi(btcAmount.bigDecimal.scaleByPowerOfTen(8).longValue)

}
36 changes: 18 additions & 18 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient
import fr.acinq.eclair.channel.Helpers.{Closing, Funding}
import fr.acinq.eclair.channel.Monitoring.Metrics.ProcessMessage
import fr.acinq.eclair.channel.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.channel.TxPublisher.{PublishRawTx, PublishTx, SetChannelId, SignAndPublishTx}
import fr.acinq.eclair.channel.publish.TxPublisher
import fr.acinq.eclair.channel.publish.TxPublisher.{PublishRawTx, PublishReplaceableTx, PublishTx, SetChannelId}
import fr.acinq.eclair.crypto.ShaChain
import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent.EventType
Expand Down Expand Up @@ -63,7 +64,7 @@ object Channel {

case class SimpleTxPublisherFactory(nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], bitcoinClient: ExtendedBitcoinClient) extends TxPublisherFactory {
override def spawnTxPublisher(context: ActorContext, remoteNodeId: PublicKey): typed.ActorRef[TxPublisher.Command] = {
context.spawn(Behaviors.supervise(TxPublisher(nodeParams, remoteNodeId, watcher, bitcoinClient)).onFailure(typed.SupervisorStrategy.restart), "tx-publisher")
context.spawn(Behaviors.supervise(TxPublisher(nodeParams, remoteNodeId, TxPublisher.SimpleChildFactory(nodeParams, bitcoinClient, watcher))).onFailure(typed.SupervisorStrategy.restart), "tx-publisher")
}
}

Expand Down Expand Up @@ -1380,7 +1381,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}
val revokedCommitPublished1 = d.revokedCommitPublished.map { rev =>
val (rev1, penaltyTxs) = Closing.claimRevokedHtlcTxOutputs(keyManager, d.commitments, rev, tx, nodeParams.onChainFeeConf.feeEstimator)
penaltyTxs.foreach(claimTx => txPublisher ! PublishRawTx(claimTx))
penaltyTxs.foreach(claimTx => txPublisher ! PublishRawTx(claimTx, None))
penaltyTxs.foreach(claimTx => blockchain ! WatchOutputSpent(self, tx.txid, claimTx.input.outPoint.index.toInt, hints = Set(claimTx.tx.txid)))
rev1
}
Expand All @@ -1394,7 +1395,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// If the tx is one of our HTLC txs, we now publish a 3rd-stage claim-htlc-tx that claims its output.
val (localCommitPublished1, claimHtlcTx_opt) = Closing.claimLocalCommitHtlcTxOutput(localCommitPublished, keyManager, d.commitments, tx, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets)
claimHtlcTx_opt.foreach(claimHtlcTx => {
txPublisher ! PublishRawTx(claimHtlcTx)
txPublisher ! PublishRawTx(claimHtlcTx, None)
blockchain ! WatchTxConfirmed(self, claimHtlcTx.tx.txid, nodeParams.minDepthBlocks)
})
Closing.updateLocalCommitPublished(localCommitPublished1, tx)
Expand Down Expand Up @@ -2046,7 +2047,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case Some(fundingTx) =>
// if we are funder, we never give up
log.info(s"republishing the funding tx...")
txPublisher ! PublishRawTx(fundingTx, "funding-tx")
txPublisher ! PublishRawTx(fundingTx, fundingTx.txIn.head.outPoint, "funding-tx", None)
// we also check if the funding tx has been double-spent
checkDoubleSpent(fundingTx)
context.system.scheduler.scheduleOnce(1 day, blockchain.toClassic, GetTxWithMeta(self, txid))
Expand Down Expand Up @@ -2199,7 +2200,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}

private def doPublish(closingTx: ClosingTx): Unit = {
txPublisher ! PublishRawTx(closingTx)
txPublisher ! PublishRawTx(closingTx, None)
blockchain ! WatchTxConfirmed(self, closingTx.tx.txid, nodeParams.minDepthBlocks)
}

Expand Down Expand Up @@ -2229,12 +2230,9 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
* This helper method will publish txs only if they haven't yet reached minDepth
*/
private def publishIfNeeded(txs: Iterable[PublishTx], irrevocablySpent: Map[OutPoint, Transaction]): Unit = {
val (skip, process) = txs.partition(publishTx => Closing.inputsAlreadySpent(publishTx.tx, irrevocablySpent))
process.foreach { publishTx =>
log.info(s"publishing txid=${publishTx.tx.txid}")
txPublisher ! publishTx
}
skip.foreach(publishTx => log.info(s"no need to republish txid=${publishTx.tx.txid}, it has already been confirmed"))
val (skip, process) = txs.partition(publishTx => Closing.inputAlreadySpent(publishTx.input, irrevocablySpent))
process.foreach { publishTx => txPublisher ! publishTx }
skip.foreach(publishTx => log.info("no need to republish tx spending {}:{}, it has already been confirmed", publishTx.input.txid, publishTx.input.index))
}

/**
Expand Down Expand Up @@ -2264,13 +2262,15 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
private def doPublish(localCommitPublished: LocalCommitPublished, commitments: Commitments): Unit = {
import localCommitPublished._

val commitInput = commitments.commitInput.outPoint
val publishQueue = commitments.commitmentFormat match {
case Transactions.DefaultCommitmentFormat =>
List(PublishRawTx(commitTx, "commit-tx")) ++ (claimMainDelayedOutputTx ++ htlcTxs.values.flatten ++ claimHtlcDelayedTxs).map(tx => PublishRawTx(tx))
val redeemableHtlcTxs = htlcTxs.values.flatten.map(tx => PublishRawTx(tx, Some(commitTx.txid)))
List(PublishRawTx(commitTx, commitInput, "commit-tx", None)) ++ (claimMainDelayedOutputTx.map(tx => PublishRawTx(tx, None)) ++ redeemableHtlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishRawTx(tx, None)))
case Transactions.AnchorOutputsCommitmentFormat =>
val claimLocalAnchor = claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx => SignAndPublishTx(tx, commitments) }
val redeemableHtlcTxs = htlcTxs.values.collect { case Some(tx) => SignAndPublishTx(tx, commitments) }
List(PublishRawTx(commitTx, "commit-tx")) ++ claimLocalAnchor ++ claimMainDelayedOutputTx.map(tx => PublishRawTx(tx)) ++ redeemableHtlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishRawTx(tx))
val claimLocalAnchor = claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx => PublishReplaceableTx(tx, commitments) }
val redeemableHtlcTxs = htlcTxs.values.collect { case Some(tx) => PublishReplaceableTx(tx, commitments) }
List(PublishRawTx(commitTx, commitInput, "commit-tx", None)) ++ claimLocalAnchor ++ claimMainDelayedOutputTx.map(tx => PublishRawTx(tx, None)) ++ redeemableHtlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishRawTx(tx, None))
}
publishIfNeeded(publishQueue, irrevocablySpent)

Expand Down Expand Up @@ -2333,7 +2333,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
private def doPublish(remoteCommitPublished: RemoteCommitPublished): Unit = {
import remoteCommitPublished._

val publishQueue = (claimMainOutputTx ++ claimHtlcTxs.values.flatten).map(tx => PublishRawTx(tx))
val publishQueue = (claimMainOutputTx ++ claimHtlcTxs.values.flatten).map(tx => PublishRawTx(tx, None))
publishIfNeeded(publishQueue, irrevocablySpent)

// we watch:
Expand Down Expand Up @@ -2372,7 +2372,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
private def doPublish(revokedCommitPublished: RevokedCommitPublished): Unit = {
import revokedCommitPublished._

val publishQueue = (claimMainOutputTx ++ mainPenaltyTx ++ htlcPenaltyTxs ++ claimHtlcDelayedPenaltyTxs).map(tx => PublishRawTx(tx))
val publishQueue = (claimMainOutputTx ++ mainPenaltyTx ++ htlcPenaltyTxs ++ claimHtlcDelayedPenaltyTxs).map(tx => PublishRawTx(tx, None))
publishIfNeeded(publishQueue, irrevocablySpent)

// we watch:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import akka.actor.{ActorRef, PossiblyHarmful}
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, DeterministicWallet, OutPoint, Satoshi, Transaction}
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.channel.TxPublisher.PublishTx
import fr.acinq.eclair.payment.OutgoingPacket.Upstream
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.transactions.CommitmentSpec
Expand Down
Loading