Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use correct type for parameters in ExtendedBitcoinClient #1248

Merged
merged 2 commits into from
Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@ package fr.acinq.eclair.blockchain.bitcoind
import fr.acinq.bitcoin._
import fr.acinq.eclair._
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BitcoinJsonRPCClient, Error, JsonRPCError}
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BitcoinJsonRPCClient, Error, ExtendedBitcoinClient, JsonRPCError}
import fr.acinq.eclair.transactions.Transactions
import grizzled.slf4j.Logging
import org.json4s.DefaultFormats
import org.json4s.JsonAST._
import org.json4s.jackson.Serialization
import scodec.bits.ByteVector

import scala.compat.Platform
import scala.concurrent.{ExecutionContext, Future}

/**
Expand All @@ -37,6 +34,8 @@ class BitcoinCoreWallet(rpcClient: BitcoinJsonRPCClient)(implicit ec: ExecutionC

import BitcoinCoreWallet._

val bitcoinClient = new ExtendedBitcoinClient(rpcClient)

def fundTransaction(hex: String, lockUnspents: Boolean, feeRatePerKw: Long): Future[FundTransactionResponse] = {
val feeRatePerKB = BigDecimal(feerateKw2KB(feeRatePerKw))
rpcClient.invoke("fundrawtransaction", hex, Options(lockUnspents, feeRatePerKB.bigDecimal.scaleByPowerOfTen(-8))).map(json => {
Expand All @@ -62,15 +61,9 @@ class BitcoinCoreWallet(rpcClient: BitcoinJsonRPCClient)(implicit ec: ExecutionC

def signTransaction(tx: Transaction): Future[SignTransactionResponse] = signTransaction(Transaction.write(tx).toHex)

def getTransaction(txid: ByteVector32): Future[Transaction] = rpcClient.invoke("getrawtransaction", txid.toString()) collect { case JString(hex) => Transaction.read(hex) }

def publishTransaction(tx: Transaction)(implicit ec: ExecutionContext): Future[String] = publishTransaction(Transaction.write(tx).toHex)

def publishTransaction(hex: String)(implicit ec: ExecutionContext): Future[String] = rpcClient.invoke("sendrawtransaction", hex) collect { case JString(txid) => txid }

def unlockOutpoints(outPoints: Seq[OutPoint])(implicit ec: ExecutionContext): Future[Boolean] = rpcClient.invoke("lockunspent", true, outPoints.toList.map(outPoint => Utxo(outPoint.txid.toString, outPoint.index))) collect { case JBool(result) => result }
def publishTransaction(tx: Transaction)(implicit ec: ExecutionContext): Future[String] = bitcoinClient.publishTransaction(tx)

def isTransactionOutputSpendable(txId: String, outputIndex: Int, includeMempool: Boolean)(implicit ec: ExecutionContext): Future[Boolean] = rpcClient.invoke("gettxout", txId, outputIndex, includeMempool) collect { case j => j != JNull }
def unlockOutpoints(outPoints: Seq[OutPoint])(implicit ec: ExecutionContext): Future[Boolean] = rpcClient.invoke("lockunspent", true, outPoints.toList.map(outPoint => Utxo(outPoint.txid, outPoint.index))) collect { case JBool(result) => result }

override def getBalance: Future[Satoshi] = rpcClient.invoke("getbalance") collect { case JDecimal(balance) => Satoshi(balance.bigDecimal.scaleByPowerOfTen(8).longValue()) }

Expand Down Expand Up @@ -111,15 +104,15 @@ class BitcoinCoreWallet(rpcClient: BitcoinJsonRPCClient)(implicit ec: ExecutionC
.map(_ => true) // if bitcoind says OK, then we consider the tx successfully published
.recoverWith { case JsonRPCError(e) =>
logger.warn(s"txid=${tx.txid} error=$e")
getTransaction(tx.txid).map(_ => true).recover { case _ => false } // if we get a parseable error from bitcoind AND the tx is NOT in the mempool/blockchain, then we consider that the tx was not published
bitcoinClient.getTransaction(tx.txid).map(_ => true).recover { case _ => false } // if we get a parseable error from bitcoind AND the tx is NOT in the mempool/blockchain, then we consider that the tx was not published
}
.recover { case _ => true } // in all other cases we consider that the tx has been published

override def rollback(tx: Transaction): Future[Boolean] = unlockOutpoints(tx.txIn.map(_.outPoint)) // we unlock all utxos used by the tx

override def doubleSpent(tx: Transaction): Future[Boolean] =
for {
exists <- getTransaction(tx.txid)
exists <- bitcoinClient.getTransaction(tx.txid)
.map(_ => true) // we have found the transaction
.recover {
case JsonRPCError(Error(_, message)) if message.contains("index") =>
Expand All @@ -135,7 +128,7 @@ class BitcoinCoreWallet(rpcClient: BitcoinJsonRPCClient)(implicit ec: ExecutionC
// if the tx wasn't in the blockchain and one of it's input has been spent, it is double-spent
// NB: we don't look in the mempool, so it means that we will only consider that the tx has been double-spent if
// the overriding transaction has been confirmed at least once
Future.sequence(tx.txIn.map(txIn => isTransactionOutputSpendable(txIn.outPoint.txid.toHex, txIn.outPoint.index.toInt, includeMempool = false))).map(_.exists(_ == false))
Future.sequence(tx.txIn.map(txIn => bitcoinClient.isTransactionOutputSpendable(txIn.outPoint.txid, txIn.outPoint.index.toInt, includeMempool = false))).map(_.exists(_ == false))
}
} yield doublespent

Expand All @@ -145,7 +138,7 @@ object BitcoinCoreWallet {

// @formatter:off
case class Options(lockUnspents: Boolean, feeRate: BigDecimal)
case class Utxo(txid: String, vout: Long)
case class Utxo(txid: ByteVector32, vout: Long)
case class FundTransactionResponse(tx: Transaction, changepos: Int, fee: Satoshi)
case class SignTransactionResponse(tx: Transaction, complete: Boolean)
// @formatter:on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,25 +117,25 @@ class ZmqWatcher(blockCount: AtomicLong, client: ExtendedBitcoinClient)(implicit
w match {
case WatchSpentBasic(_, txid, outputIndex, _, _) =>
// not: we assume parent tx was published, we just need to make sure this particular output has not been spent
client.isTransactionOutputSpendable(txid.toString(), outputIndex, includeMempool = true).collect {
client.isTransactionOutputSpendable(txid, outputIndex, includeMempool = true).collect {
case false =>
log.info(s"output=$outputIndex of txid=$txid has already been spent")
self ! TriggerEvent(w, WatchEventSpentBasic(w.event))
}

case WatchSpent(_, txid, outputIndex, _, _) =>
// first let's see if the parent tx was published or not
client.getTxConfirmations(txid.toString()).collect {
client.getTxConfirmations(txid).collect {
case Some(_) =>
// parent tx was published, we need to make sure this particular output has not been spent
client.isTransactionOutputSpendable(txid.toString(), outputIndex, includeMempool = true).collect {
client.isTransactionOutputSpendable(txid, outputIndex, includeMempool = true).collect {
case false =>
log.info(s"$txid:$outputIndex has already been spent, looking for the spending tx in the mempool")
client.getMempool().map { mempoolTxs =>
mempoolTxs.filter(tx => tx.txIn.exists(i => i.outPoint.txid == txid && i.outPoint.index == outputIndex)) match {
case Nil =>
log.warning(s"$txid:$outputIndex has already been spent, spending tx not in the mempool, looking in the blockchain...")
client.lookForSpendingTx(None, txid.toString(), outputIndex).map { tx =>
client.lookForSpendingTx(None, txid, outputIndex).map { tx =>
log.warning(s"found the spending tx of $txid:$outputIndex in the blockchain: txid=${tx.txid}")
self ! NewTransaction(tx)
}
Expand Down Expand Up @@ -187,7 +187,7 @@ class ZmqWatcher(blockCount: AtomicLong, client: ExtendedBitcoinClient)(implicit

case ValidateRequest(ann) => client.validate(ann).pipeTo(sender)

case GetTxWithMeta(txid) => client.getTransactionMeta(txid.toString()).pipeTo(sender)
case GetTxWithMeta(txid) => client.getTransactionMeta(txid).pipeTo(sender)

case Terminated(channel) =>
// we remove watches associated to dead actor
Expand Down Expand Up @@ -219,10 +219,10 @@ class ZmqWatcher(blockCount: AtomicLong, client: ExtendedBitcoinClient)(implicit
log.debug("checking confirmations of txid={}", w.txId)
// NB: this is very inefficient since internally we call `getrawtransaction` three times, but it doesn't really
// matter because this only happens once, when the watched transaction has reached min_depth
client.getTxConfirmations(w.txId.toString).flatMap {
client.getTxConfirmations(w.txId).flatMap {
case Some(confirmations) if confirmations >= w.minDepth =>
client.getTransaction(w.txId.toString).flatMap { tx =>
client.getTransactionShortId(w.txId.toString).map {
client.getTransaction(w.txId).flatMap { tx =>
client.getTransactionShortId(w.txId).map {
case (height, index) => self ! TriggerEvent(w, WatchEventConfirmed(w.event, height, index, tx))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,26 @@ package fr.acinq.eclair.blockchain.bitcoind.rpc

import com.softwaremill.sttp._
import com.softwaremill.sttp.json4s._
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.KamonExt
import fr.acinq.eclair.blockchain.Monitoring.{Metrics, Tags}
import org.json4s.DefaultFormats
import org.json4s.JsonAST.JValue
import org.json4s.{CustomSerializer, DefaultFormats}
import org.json4s.JsonAST.{JString, JValue}
import org.json4s.jackson.Serialization

import scala.concurrent.{ExecutionContext, Future}

class BasicBitcoinJsonRPCClient(user: String, password: String, host: String = "127.0.0.1", port: Int = 8332, ssl: Boolean = false)(implicit http: SttpBackend[Future, Nothing]) extends BitcoinJsonRPCClient {

// necessary to properly serialize ByteVector32 into String readable by bitcoind
object ByteVector32Serializer extends CustomSerializer[ByteVector32](_ => ( {
null
}, {
case x: ByteVector32 => JString(x.toHex)
}))
implicit val formats = DefaultFormats.withBigDecimal + ByteVector32Serializer
private val scheme = if (ssl) "https" else "http"
private val serviceUri = uri"$scheme://$host:$port/wallet/" // wallet/ specifies to use the default bitcoind wallet, named ""
implicit val formats = DefaultFormats.withBigDecimal
implicit val serialization = Serialization

override def invoke(method: String, params: Any*)(implicit ec: ExecutionContext): Future[JValue] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,81 +34,81 @@ class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) {

implicit val formats = org.json4s.DefaultFormats

def getTxConfirmations(txId: String)(implicit ec: ExecutionContext): Future[Option[Int]] =
rpcClient.invoke("getrawtransaction", txId, 1) // we choose verbose output to get the number of confirmations
def getTxConfirmations(txid: ByteVector32)(implicit ec: ExecutionContext): Future[Option[Int]] =
rpcClient.invoke("getrawtransaction", txid, 1) // we choose verbose output to get the number of confirmations
.map(json => Some((json \ "confirmations").extractOrElse[Int](0)))
.recover {
case t: JsonRPCError if t.error.code == -5 => None
}

def getTxBlockHash(txId: String)(implicit ec: ExecutionContext): Future[Option[String]] =
rpcClient.invoke("getrawtransaction", txId, 1) // we choose verbose output to get the number of confirmations
.map(json => (json \ "blockhash").extractOpt[String])
def getTxBlockHash(txid: ByteVector32)(implicit ec: ExecutionContext): Future[Option[ByteVector32]] =
rpcClient.invoke("getrawtransaction", txid, 1) // we choose verbose output to get the number of confirmations
.map(json => (json \ "blockhash").extractOpt[String].map(ByteVector32.fromValidHex))
.recover {
case t: JsonRPCError if t.error.code == -5 => None
}

def lookForSpendingTx(blockhash_opt: Option[String], txid: String, outputIndex: Int)(implicit ec: ExecutionContext): Future[Transaction] =
def lookForSpendingTx(blockhash_opt: Option[ByteVector32], txid: ByteVector32, outputIndex: Int)(implicit ec: ExecutionContext): Future[Transaction] =
for {
blockhash <- blockhash_opt match {
case Some(b) => Future.successful(b)
case None => rpcClient.invoke("getbestblockhash") collect { case JString(b) => b }
case None => rpcClient.invoke("getbestblockhash") collect { case JString(b) => ByteVector32.fromValidHex(b) }
}
// with a verbosity of 0, getblock returns the raw serialized block
block <- rpcClient.invoke("getblock", blockhash, 0).collect { case JString(b) => Block.read(b) }
prevblockhash = block.header.hashPreviousBlock.reverse.toHex
res <- block.tx.find(tx => tx.txIn.exists(i => i.outPoint.txid.toString() == txid && i.outPoint.index == outputIndex)) match {
prevblockhash = block.header.hashPreviousBlock.reverse
res <- block.tx.find(tx => tx.txIn.exists(i => i.outPoint.txid == txid && i.outPoint.index == outputIndex)) match {
case None => lookForSpendingTx(Some(prevblockhash), txid, outputIndex)
case Some(tx) => Future.successful(tx)
}
} yield res

def getMempool()(implicit ec: ExecutionContext): Future[Seq[Transaction]] =
for {
txids <- rpcClient.invoke("getrawmempool").map(json => json.extract[List[String]])
txids <- rpcClient.invoke("getrawmempool").map(json => json.extract[List[String]].map(ByteVector32.fromValidHex))
txs <- Future.sequence(txids.map(getTransaction(_)))
} yield txs

/**
* @param txId
* @param txid
* @param ec
* @return
*/
def getRawTransaction(txId: String)(implicit ec: ExecutionContext): Future[String] =
rpcClient.invoke("getrawtransaction", txId) collect {
def getRawTransaction(txid: ByteVector32)(implicit ec: ExecutionContext): Future[String] =
rpcClient.invoke("getrawtransaction", txid) collect {
case JString(raw) => raw
}

def getTransaction(txId: String)(implicit ec: ExecutionContext): Future[Transaction] =
getRawTransaction(txId).map(raw => Transaction.read(raw))
def getTransaction(txid: ByteVector32)(implicit ec: ExecutionContext): Future[Transaction] =
getRawTransaction(txid).map(raw => Transaction.read(raw))

def getTransactionMeta(txId: String)(implicit ec: ExecutionContext): Future[GetTxWithMetaResponse] =
def getTransactionMeta(txid: ByteVector32)(implicit ec: ExecutionContext): Future[GetTxWithMetaResponse] =
for {
tx_opt <- getTransaction(txId) map(Some(_)) recover { case _ => None }
tx_opt <- getTransaction(txid) map(Some(_)) recover { case _ => None }
blockchaininfo <- rpcClient.invoke("getblockchaininfo")
JInt(timestamp) = blockchaininfo \ "mediantime"
} yield GetTxWithMetaResponse(txid = ByteVector32.fromValidHex(txId), tx_opt, timestamp.toLong)
} yield GetTxWithMetaResponse(txid = txid, tx_opt, timestamp.toLong)

def isTransactionOutputSpendable(txId: String, outputIndex: Int, includeMempool: Boolean)(implicit ec: ExecutionContext): Future[Boolean] =
def isTransactionOutputSpendable(txid: ByteVector32, outputIndex: Int, includeMempool: Boolean)(implicit ec: ExecutionContext): Future[Boolean] =
for {
json <- rpcClient.invoke("gettxout", txId, outputIndex, includeMempool)
json <- rpcClient.invoke("gettxout", txid, outputIndex, includeMempool)
} yield json != JNull

/**
*
* @param txId transaction id
* @param txid transaction id
* @param ec
* @return a Future[height, index] where height is the height of the block where this transaction was published, and index is
* the index of the transaction in that block
*/
def getTransactionShortId(txId: String)(implicit ec: ExecutionContext): Future[(Int, Int)] = {
def getTransactionShortId(txid: ByteVector32)(implicit ec: ExecutionContext): Future[(Int, Int)] = {
val future = for {
Some(blockHash) <- getTxBlockHash(txId)
Some(blockHash) <- getTxBlockHash(txid)
json <- rpcClient.invoke("getblock", blockHash)
JInt(height) = json \ "height"
JString(hash) = json \ "hash"
JArray(txs) = json \ "tx"
index = txs.indexOf(JString(txId))
index = txs.indexOf(JString(txid.toHex))
} yield (height.toInt, index)

future
Expand All @@ -130,10 +130,10 @@ class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) {
} recoverWith {
case JsonRPCError(Error(-27, _)) =>
// "transaction already in block chain (code: -27)" ignore error
Future.successful(tx.txid.toString())
Future.successful(tx.txid.toHex)
case e@JsonRPCError(Error(-25, _)) =>
// "missing inputs (code: -25)" it may be that the tx has already been published and its output spent
getRawTransaction(tx.txid.toString()).map { case _ => tx.txid.toString() }.recoverWith { case _ => Future.failed[String](e) }
getRawTransaction(tx.txid).map { _ => tx.txid.toHex }.recoverWith { case _ => Future.failed[String](e) }
}

/**
Expand All @@ -154,14 +154,14 @@ class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) {
for {
_ <- Future.successful(0)
span0 = Kamon.spanBuilder("getblockhash").start()
blockHash: String <- rpcClient.invoke("getblockhash", blockHeight).map(_.extractOrElse[String](ByteVector32.Zeroes.toHex))
blockHash <- rpcClient.invoke("getblockhash", blockHeight).map(_.extractOpt[String].map(ByteVector32.fromValidHex).getOrElse(ByteVector32.Zeroes))
_ = span0.finish()
span1 = Kamon.spanBuilder("getblock").start()
txid: String <- rpcClient.invoke("getblock", blockHash).map {
txid: ByteVector32 <- rpcClient.invoke("getblock", blockHash).map {
case json => Try {
val JArray(txs) = json \ "tx"
txs(txIndex).extract[String]
} getOrElse ByteVector32.Zeroes.toHex
ByteVector32.fromValidHex(txs(txIndex).extract[String])
} getOrElse ByteVector32.Zeroes
}
_ = span1.finish()
span2 = Kamon.spanBuilder("getrawtx").start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package fr.acinq.eclair

import akka.actor.ActorSystem
import fr.acinq.bitcoin.{Block, Transaction}
import fr.acinq.bitcoin.{Block, ByteVector32, Transaction}
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BasicBitcoinJsonRPCClient, ExtendedBitcoinClient}

Expand All @@ -42,10 +42,10 @@ class TestBitcoinClient()(implicit system: ActorSystem) extends ExtendedBitcoinC
Future.successful(tx.txid.toString())
}

override def getTxConfirmations(txId: String)(implicit ec: ExecutionContext): Future[Option[Int]] = Future.successful(Some(10))
override def getTxConfirmations(txId: ByteVector32)(implicit ec: ExecutionContext): Future[Option[Int]] = Future.successful(Some(10))

override def getTransaction(txId: String)(implicit ec: ExecutionContext): Future[Transaction] = ???
override def getTransaction(txId: ByteVector32)(implicit ec: ExecutionContext): Future[Transaction] = ???

override def getTransactionShortId(txId: String)(implicit ec: ExecutionContext): Future[(Int, Int)] = Future.successful((400000, 42))
override def getTransactionShortId(txId: ByteVector32)(implicit ec: ExecutionContext): Future[(Int, Int)] = Future.successful((400000, 42))

}
Loading