Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Electrum: fixes and improvements #924

Merged
merged 5 commits into from
Mar 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions eclair-core/src/main/resources/electrum/servers_mainnet.json
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,6 @@
"t": "50001",
"version": "1.4"
},
"electrum3.hachre.de": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"electrumx.bot.nu": {
"pruning": "-",
"s": "50002",
Expand Down Expand Up @@ -317,12 +311,6 @@
"t": "50001",
"version": "1.4"
},
"oneweek.duckdns.org": {
"pruning": "-",
"s": "50002",
"t": "50001",
"version": "1.4"
},
"orannis.com": {
"pruning": "-",
"s": "50002",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import scodec.bits.ByteVector
import scala.annotation.tailrec
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}

/**
* For later optimizations, see http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html
Expand Down Expand Up @@ -599,9 +600,15 @@ object ElectrumClient {
case _ => ScriptHashSubscriptionResponse(scriptHash, "")
}
case BroadcastTransaction(tx) =>
val JString(txid) = json.result
require(ByteVector32.fromValidHex(txid) == tx.txid)
BroadcastTransactionResponse(tx, None)
val JString(message) = json.result
// if we got here, it means that the server's response does not contain an error and message should be our
// transaction id. However, it seems that at least on testnet some servers still use an older version of the
// Electrum protocol and return an error message in the result field
Try(ByteVector32.fromValidHex(message)) match {
case Success(txid) if txid == tx.txid => BroadcastTransactionResponse(tx, None)
case Success(txid) => BroadcastTransactionResponse(tx, Some(Error(1, s"response txid $txid does not match request txid ${tx.txid}")))
case Failure(_) => BroadcastTransactionResponse(tx, Some(Error(1, message)))
}
case GetHeader(height) =>
val JString(hex) = json.result
GetHeaderResponse(height, BlockHeader.read(hex))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class ElectrumClientPool(serverAddresses: Set[ElectrumServerAddress])(implicit v

whenUnhandled {
case Event(Connect, _) =>
Random.shuffle(serverAddresses.toSeq diff addresses.values.toSeq).headOption match {
pickAddress(serverAddresses, addresses.values.toSet) match {
case Some(ElectrumServerAddress(address, ssl)) =>
val resolved = new InetSocketAddress(address.getHostName, address.getPort)
val client = context.actorOf(Props(new ElectrumClient(resolved, ssl)))
Expand Down Expand Up @@ -211,6 +211,16 @@ object ElectrumClientPool {
stream.close()
}

/**
*
* @param serverAddresses all addresses to choose from
* @param usedAddresses current connections
* @return a random address that we're not connected to yet
*/
def pickAddress(serverAddresses: Set[ElectrumServerAddress], usedAddresses: Set[InetSocketAddress]): Option[ElectrumServerAddress] = {
Random.shuffle(serverAddresses.filterNot(a => usedAddresses.contains(a.adress)).toSeq).headOption
}

// @formatter:off
sealed trait State
case object Disconnected extends State
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import java.net.InetSocketAddress

import akka.actor.{ActorRef, ActorSystem, Props}
import akka.testkit.{TestKit, TestProbe}
import akka.util.Timeout
import fr.acinq.bitcoin.{ByteVector32, Crypto, Transaction}
import fr.acinq.eclair.blockchain.electrum.ElectrumClient._
import grizzled.slf4j.Logging
import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
import scodec.bits._

import scala.concurrent.duration._
import scala.util.Random


class ElectrumClientPoolSpec extends TestKit(ActorSystem("test")) with FunSuiteLike with Logging with BeforeAndAfterAll {
Expand All @@ -35,17 +37,35 @@ class ElectrumClientPoolSpec extends TestKit(ActorSystem("test")) with FunSuiteL
// this is tx #2690 of block #500000
val referenceTx = Transaction.read("0200000001983c5b32ced1de5ae97d3ce9b7436f8bb0487d15bf81e5cae97b1e238dc395c6000000006a47304402205957c75766e391350eba2c7b752f0056cb34b353648ecd0992a8a81fc9bcfe980220629c286592842d152cdde71177cd83086619744a533f262473298cacf60193500121021b8b51f74dbf0ac1e766d162c8707b5e8d89fc59da0796f3b4505e7c0fb4cf31feffffff0276bd0101000000001976a914219de672ba773aa0bc2e15cdd9d2e69b734138fa88ac3e692001000000001976a914301706dede031e9fb4b60836e073a4761855f6b188ac09a10700")
val scriptHash = Crypto.sha256(referenceTx.txOut(0).publicKeyScript).reverse
import scala.concurrent.ExecutionContext.Implicits.global
val serverAddresses = {
val stream = classOf[ElectrumClientSpec].getResourceAsStream("/electrum/servers_mainnet.json")
val addresses = ElectrumClientPool.readServerAddresses(stream, sslEnabled = false)
stream.close()
addresses
}

implicit val timeout = 20 seconds

import concurrent.ExecutionContext.Implicits.global

override protected def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}

test("pick a random, unused server address") {
val usedAddresses = Random.shuffle(serverAddresses.toSeq).take(serverAddresses.size / 2).map(_.adress).toSet
for(_ <- 1 to 10) {
val Some(pick) = ElectrumClientPool.pickAddress(serverAddresses, usedAddresses)
assert(!usedAddresses.contains(pick.adress))
}
}

test("init an electrumx connection pool") {
val random = new Random()
val stream = classOf[ElectrumClientSpec].getResourceAsStream("/electrum/servers_mainnet.json")
val addresses = ElectrumClientPool.readServerAddresses(stream, sslEnabled = false).take(2) + ElectrumClientPool.ElectrumServerAddress(new InetSocketAddress("electrum.acinq.co", 50002), SSL.STRICT)
assert(addresses.nonEmpty)
val addresses = random.shuffle(serverAddresses.toSeq).take(2).toSet + ElectrumClientPool.ElectrumServerAddress(new InetSocketAddress("electrum.acinq.co", 50002), SSL.STRICT)
stream.close()
assert(addresses.nonEmpty)
pool = system.actorOf(Props(new ElectrumClientPool(addresses)), "electrum-client")
}

Expand All @@ -54,19 +74,19 @@ class ElectrumClientPoolSpec extends TestKit(ActorSystem("test")) with FunSuiteL
// make sure our master is stable, if the first master that we select is behind the other servers we will switch
// during the first few seconds
awaitCond({
probe.expectMsgType[ElectrumReady]
probe.expectMsgType[ElectrumReady](30 seconds)
probe.receiveOne(5 seconds) == null
}, max = 15 seconds, interval = 1000 millis) }
}, max = 60 seconds, interval = 1000 millis) }

test("get transaction") {
probe.send(pool, GetTransaction(referenceTx.txid))
val GetTransactionResponse(tx) = probe.expectMsgType[GetTransactionResponse]
val GetTransactionResponse(tx) = probe.expectMsgType[GetTransactionResponse](timeout)
assert(tx == referenceTx)
}

test("get merkle tree") {
probe.send(pool, GetMerkle(referenceTx.txid, 500000))
val response = probe.expectMsgType[GetMerkleResponse]
val response = probe.expectMsgType[GetMerkleResponse](timeout)
assert(response.txid == referenceTx.txid)
assert(response.block_height == 500000)
assert(response.pos == 2690)
Expand All @@ -76,26 +96,26 @@ class ElectrumClientPoolSpec extends TestKit(ActorSystem("test")) with FunSuiteL
test("header subscription") {
val probe1 = TestProbe()
probe1.send(pool, HeaderSubscription(probe1.ref))
val HeaderSubscriptionResponse(_, header) = probe1.expectMsgType[HeaderSubscriptionResponse]
val HeaderSubscriptionResponse(_, header) = probe1.expectMsgType[HeaderSubscriptionResponse](timeout)
logger.info(s"received header for block ${header.blockId}")
}

test("scripthash subscription") {
val probe1 = TestProbe()
probe1.send(pool, ScriptHashSubscription(scriptHash, probe1.ref))
val ScriptHashSubscriptionResponse(scriptHash1, status) = probe1.expectMsgType[ScriptHashSubscriptionResponse]
val ScriptHashSubscriptionResponse(scriptHash1, status) = probe1.expectMsgType[ScriptHashSubscriptionResponse](timeout)
assert(status != "")
}

test("get scripthash history") {
probe.send(pool, GetScriptHashHistory(scriptHash))
val GetScriptHashHistoryResponse(scriptHash1, history) = probe.expectMsgType[GetScriptHashHistoryResponse]
val GetScriptHashHistoryResponse(scriptHash1, history) = probe.expectMsgType[GetScriptHashHistoryResponse](timeout)
assert(history.contains((TransactionHistoryItem(500000, referenceTx.txid))))
}

test("list script unspents") {
probe.send(pool, ScriptHashListUnspent(scriptHash))
val ScriptHashListUnspentResponse(scriptHash1, unspents) = probe.expectMsgType[ScriptHashListUnspentResponse]
val ScriptHashListUnspentResponse(scriptHash1, unspents) = probe.expectMsgType[ScriptHashListUnspentResponse](timeout)
assert(unspents.isEmpty)
}
}