Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send events when HTLCs settle on-chain #884

Merged
merged 18 commits into from
Apr 3, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions eclair-core/src/main/scala/fr/acinq/eclair/api/Service.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2018 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.api

import akka.http.scaladsl.server._
Expand All @@ -16,10 +32,12 @@ import akka.http.scaladsl.server.directives.{Credentials, LoggingMagnet}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, Source}
import fr.acinq.eclair.io.NodeURI
import fr.acinq.eclair.payment.{PaymentLifecycle, PaymentReceived, PaymentRequest}
import fr.acinq.eclair.payment.PaymentLifecycle.PaymentFailed
import fr.acinq.eclair.payment._
import grizzled.slf4j.Logging
import org.json4s.ShortTypeHints
import org.json4s.jackson.Serialization
import scodec.bits.ByteVector

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._

Expand All @@ -31,6 +49,15 @@ trait Service extends Directives with Logging {
import JsonSupport.marshaller
import JsonSupport.formats
import JsonSupport.serialization
// used to send typed messages over the websocket
val formatsWithTypeHint = formats.withTypeHintFieldName("type") +
ShortTypeHints(List(
classOf[PaymentSent],
classOf[PaymentRelayed],
classOf[PaymentReceived],
classOf[PaymentSettlingOnChain],
classOf[PaymentFailed]))


def password: String

Expand Down Expand Up @@ -65,13 +92,20 @@ trait Service extends Directives with Logging {
// create a flow transforming a queue of string -> string
val (flowInput, flowOutput) = Source.queue[String](10, OverflowStrategy.dropTail).toMat(BroadcastHub.sink[String])(Keep.both).run()

// register an actor that feeds the queue when a payment is received
// register an actor that feeds the queue on payment related events
actorSystem.actorOf(Props(new Actor {
override def preStart: Unit = context.system.eventStream.subscribe(self, classOf[PaymentReceived])

override def preStart: Unit = {
context.system.eventStream.subscribe(self, classOf[PaymentFailed])
context.system.eventStream.subscribe(self, classOf[PaymentEvent])
}

def receive: Receive = {
case received: PaymentReceived => flowInput.offer(received.paymentHash.toString)
case message: PaymentFailed => flowInput.offer(Serialization.write(message)(formatsWithTypeHint))
case message: PaymentEvent => flowInput.offer(Serialization.write(message)(formatsWithTypeHint))
case other => logger.info(s"Unexpected ws message: $other")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unhandled message should be handled differently

}

}))

Flow[Message]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1236,7 +1236,7 @@ class Channel(val nodeParams: NodeParams, wallet: EclairWallet, remoteNodeId: Pu
}
}
// we also need to fail outgoing htlcs that we know will never reach the blockchain
val overridenHtlcs = Closing.overriddenHtlcs(d.commitments.localCommit, d.commitments.remoteCommit, d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit), tx)
val overridenHtlcs = Closing.overriddenOutgoingHtlcs(d.commitments.localCommit, d.commitments.remoteCommit, d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit), tx)
overridenHtlcs.foreach { add =>
d.commitments.originChannels.get(add.id) match {
case Some(origin) =>
Expand All @@ -1247,6 +1247,11 @@ class Channel(val nodeParams: NodeParams, wallet: EclairWallet, remoteNodeId: Pu
log.info(s"cannot fail overriden htlc #${add.id} paymentHash=${add.paymentHash} (origin not found)")
}
}
// for our outgoing payments, let's send events if we know that they will settle on chain
Closing
.onchainOutgoingHtlcs(d.commitments.localCommit, d.commitments.remoteCommit, d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit), tx)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sstone can you double check this? this is quite sensitive

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes LGTM

.filter(add => Closing.isSentByLocal(add.id, d.commitments.originChannels)) // we only care about htlcs for which we were the original sender here
.foreach(add => context.system.eventStream.publish(PaymentSettlingOnChain(amount = MilliSatoshi(add.amountMsat), add.paymentHash)))
// then let's see if any of the possible close scenarii can be considered done
val mutualCloseDone = d.mutualClosePublished.exists(_.txid == tx.txid) // this case is trivial, in a mutual close scenario we only need to make sure that one of the closing txes is confirmed
val localCommitDone = localCommitPublished1.map(Closing.isLocalCommitDone(_)).getOrElse(false)
Expand Down
35 changes: 34 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import fr.acinq.bitcoin.{OutPoint, _}
import fr.acinq.eclair.blockchain.EclairWallet
import fr.acinq.eclair.crypto.{Generators, KeyManager}
import fr.acinq.eclair.db.ChannelsDb
import fr.acinq.eclair.payment.{Local, Origin}
import fr.acinq.eclair.transactions.Scripts._
import fr.acinq.eclair.transactions.Transactions._
import fr.acinq.eclair.transactions._
Expand Down Expand Up @@ -802,6 +803,38 @@ object Helpers {
}).toSet.flatten
}

/**
* Tells if we were the origin of this outgoing htlc
*
* @param htlcId
* @param originChannels
* @return
*/
def isSentByLocal(htlcId: Long, originChannels: Map[Long, Origin]) = (originChannels.get(htlcId) collect { case l: Local => l }).isDefined

/**
* As soon as a local or remote commitment reaches min_depth, we know which htlcs will be settled on-chain (whether
* or not they actually have an output in the commitment tx).
*
* @param localCommit
* @param remoteCommit
* @param nextRemoteCommit_opt
* @param tx
* @param eventStream
* @param log
*/
def onchainOutgoingHtlcs(localCommit: LocalCommit, remoteCommit: RemoteCommit, nextRemoteCommit_opt: Option[RemoteCommit], tx: Transaction): Set[UpdateAddHtlc] = {
if (localCommit.publishableTxs.commitTx.tx.txid == tx.txid) {
localCommit.spec.htlcs.filter(_.direction == OUT).map(_.add)
} else if (remoteCommit.txid == tx.txid) {
remoteCommit.spec.htlcs.filter(_.direction == IN).map(_.add)
} else if (nextRemoteCommit_opt.map(_.txid) == Some(tx.txid)) {
nextRemoteCommit_opt.get.spec.htlcs.filter(_.direction == IN).map(_.add)
} else {
Set.empty
}
}

/**
* If a local commitment tx reaches min_depth, we need to fail the outgoing htlcs that only us had signed, because
* they will never reach the blockchain.
Expand All @@ -814,7 +847,7 @@ object Helpers {
* @param log
* @return
*/
def overriddenHtlcs(localCommit: LocalCommit, remoteCommit: RemoteCommit, nextRemoteCommit_opt: Option[RemoteCommit], tx: Transaction)(implicit log: LoggingAdapter): Set[UpdateAddHtlc] =
def overriddenOutgoingHtlcs(localCommit: LocalCommit, remoteCommit: RemoteCommit, nextRemoteCommit_opt: Option[RemoteCommit], tx: Transaction)(implicit log: LoggingAdapter): Set[UpdateAddHtlc] =
if (localCommit.publishableTxs.commitTx.tx.txid == tx.txid) {
// our commit got confirmed, so any htlc that we signed but they didn't sign will never reach the chain
val mostRecentRemoteCommit = nextRemoteCommit_opt.getOrElse(remoteCommit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ class LocalPaymentHandler(nodeParams: NodeParams) extends Actor with ActorLoggin
case PurgeExpiredRequests =>
context.become(run(hash2preimage.filterNot { case (_, pr) => hasExpired(pr) }))

case ReceivePayment(amount_opt, desc, expirySeconds_opt, extraHops) =>
case ReceivePayment(amount_opt, desc, expirySeconds_opt, extraHops, fallbackAddress_opt) =>
Try {
if (hash2preimage.size > nodeParams.maxPendingPaymentRequests) {
throw new RuntimeException(s"too many pending payment requests (max=${nodeParams.maxPendingPaymentRequests})")
}
val paymentPreimage = randomBytes32
val paymentHash = Crypto.sha256(paymentPreimage)
val expirySeconds = expirySeconds_opt.getOrElse(nodeParams.paymentRequestExpiry.toSeconds)
val paymentRequest = PaymentRequest(nodeParams.chainHash, amount_opt, paymentHash, nodeParams.privateKey, desc, fallbackAddress = None, expirySeconds = Some(expirySeconds), extraHops = extraHops)
val paymentRequest = PaymentRequest(nodeParams.chainHash, amount_opt, paymentHash, nodeParams.privateKey, desc, fallbackAddress_opt, expirySeconds = Some(expirySeconds), extraHops = extraHops)
log.debug(s"generated payment request=${PaymentRequest.write(paymentRequest)} from amount=$amount_opt")
sender ! paymentRequest
context.become(run(hash2preimage + (paymentHash -> PendingPaymentRequest(paymentPreimage, paymentRequest))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ case class PaymentSent(amount: MilliSatoshi, feesPaid: MilliSatoshi, paymentHash
case class PaymentRelayed(amountIn: MilliSatoshi, amountOut: MilliSatoshi, paymentHash: ByteVector32, fromChannelId: ByteVector32, toChannelId: ByteVector32, timestamp: Long = Platform.currentTime) extends PaymentEvent

case class PaymentReceived(amount: MilliSatoshi, paymentHash: ByteVector32, fromChannelId: ByteVector32, timestamp: Long = Platform.currentTime) extends PaymentEvent

case class PaymentSettlingOnChain(amount: MilliSatoshi, paymentHash: ByteVector32, timestamp: Long = Platform.currentTime) extends PaymentEvent
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ object PaymentLifecycle {
def props(sourceNodeId: PublicKey, router: ActorRef, register: ActorRef) = Props(classOf[PaymentLifecycle], sourceNodeId, router, register)

// @formatter:off
case class ReceivePayment(amountMsat_opt: Option[MilliSatoshi], description: String, expirySeconds_opt: Option[Long] = None, extraHops: List[List[ExtraHop]] = Nil)
case class ReceivePayment(amountMsat_opt: Option[MilliSatoshi], description: String, expirySeconds_opt: Option[Long] = None, extraHops: List[List[ExtraHop]] = Nil, fallbackAddress: Option[String] = None)
/**
* @param maxFeePct set by default to 3% as a safety measure (even if a route is found, if fee is higher than that payment won't be attempted)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,10 @@ object PaymentRequest {
f.version match {
case 17 if prefix == "lnbc" => Base58Check.encode(Base58.Prefix.PubkeyAddress, data)
case 18 if prefix == "lnbc" => Base58Check.encode(Base58.Prefix.ScriptAddress, data)
case 17 if prefix == "lntb" => Base58Check.encode(Base58.Prefix.PubkeyAddressTestnet, data)
case 18 if prefix == "lntb" => Base58Check.encode(Base58.Prefix.ScriptAddressTestnet, data)
case 17 if prefix == "lntb" || prefix == "lnbcrt" => Base58Check.encode(Base58.Prefix.PubkeyAddressTestnet, data)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sstone can you double check this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's correct because Base58 addresses on testnet and regtest have the same prefix, but not Bech32 addresses :(
But I think handling "lnbcrt" on a separate line looks a bit cleaner

case 18 if prefix == "lntb" || prefix == "lnbcrt" => Base58Check.encode(Base58.Prefix.ScriptAddressTestnet, data)
case version if prefix == "lnbc" => Bech32.encodeWitnessAddress("bc", version, data)
case version if prefix == "lntb" => Bech32.encodeWitnessAddress("tb", version, data)
case version if prefix == "lntb" || prefix == "lnbcrt" => Bech32.encodeWitnessAddress("tb", version, data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bech32 prefix should be "bcrt", not "tb"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 5e476fd

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,22 @@ package fr.acinq.eclair.api
import akka.actor.{Actor, ActorSystem, Props, Scheduler}
import org.scalatest.FunSuite
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest, WSProbe}
import fr.acinq.eclair._
import fr.acinq.eclair.io.Peer.{GetPeerInfo, PeerInfo}
import TestConstants._
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.{Directives, Route}
import akka.stream.ActorMaterializer
import akka.http.scaladsl.model.{ContentTypes, FormData, MediaTypes, Multipart}
import fr.acinq.bitcoin.{ByteVector32, Crypto}
import fr.acinq.bitcoin.{ByteVector32, Crypto, MilliSatoshi}
import fr.acinq.eclair.channel.RES_GETINFO
import fr.acinq.eclair.db.{NetworkFee, Stats}
import fr.acinq.eclair.payment.{PaymentLifecycle, PaymentRequest}
import fr.acinq.eclair.payment.PaymentLifecycle.PaymentFailed
import fr.acinq.eclair.payment._
import fr.acinq.eclair.router.{ChannelDesc, RouteResponse}
import fr.acinq.eclair.wire.{ChannelUpdate, NodeAddress, NodeAnnouncement}
import org.json4s.jackson.Serialization
import scodec.bits.ByteVector
import scala.concurrent.Future
import scala.concurrent.duration._
Expand Down Expand Up @@ -254,6 +256,53 @@ class ApiServiceSpec extends FunSuite with ScalatestRouteTest {
}
}

test("the websocket should return typed objects") {

val mockService = new MockService(new EclairMock {})

val websocketRoute = Directives.path("ws") {
Directives.handleWebSocketMessages(mockService.makeSocketHandler)
}

val wsClient = WSProbe()

WS("/ws", wsClient.flow) ~> websocketRoute ~>
check {

val pf = PaymentFailed(ByteVector32.Zeroes, failures = Seq.empty)
val expectedSerializedPf = """{"type":"PaymentLifecycle$PaymentFailed","paymentHash":"0000000000000000000000000000000000000000000000000000000000000000","failures":[]}"""
Serialization.write(pf)(mockService.formatsWithTypeHint) === expectedSerializedPf
system.eventStream.publish(pf)
wsClient.expectMessage(expectedSerializedPf)

val ps = PaymentSent(amount = MilliSatoshi(21), feesPaid = MilliSatoshi(1), paymentHash = ByteVector32.Zeroes, paymentPreimage = ByteVector32.One, toChannelId = ByteVector32.Zeroes, timestamp = 1553784337711L)
val expectedSerializedPs = """{"type":"PaymentSent","amount":21,"feesPaid":1,"paymentHash":"0000000000000000000000000000000000000000000000000000000000000000","paymentPreimage":"0100000000000000000000000000000000000000000000000000000000000000","toChannelId":"0000000000000000000000000000000000000000000000000000000000000000","timestamp":1553784337711}"""
Serialization.write(ps)(mockService.formatsWithTypeHint) === expectedSerializedPs
system.eventStream.publish(ps)
wsClient.expectMessage(expectedSerializedPs)

val prel = PaymentRelayed(amountIn = MilliSatoshi(21), amountOut = MilliSatoshi(20), paymentHash = ByteVector32.Zeroes, fromChannelId = ByteVector32.Zeroes, ByteVector32.One, timestamp = 1553784963659L)
val expectedSerializedPrel = """{"type":"PaymentRelayed","amountIn":21,"amountOut":20,"paymentHash":"0000000000000000000000000000000000000000000000000000000000000000","fromChannelId":"0000000000000000000000000000000000000000000000000000000000000000","toChannelId":"0100000000000000000000000000000000000000000000000000000000000000","timestamp":1553784963659}"""
Serialization.write(prel)(mockService.formatsWithTypeHint) === expectedSerializedPrel
system.eventStream.publish(prel)
wsClient.expectMessage(expectedSerializedPrel)

val precv = PaymentReceived(amount = MilliSatoshi(21), paymentHash = ByteVector32.Zeroes, fromChannelId = ByteVector32.One, timestamp = 1553784963659L)
val expectedSerializedPrecv = """{"type":"PaymentReceived","amount":21,"paymentHash":"0000000000000000000000000000000000000000000000000000000000000000","fromChannelId":"0100000000000000000000000000000000000000000000000000000000000000","timestamp":1553784963659}"""
Serialization.write(precv)(mockService.formatsWithTypeHint) === expectedSerializedPrecv
system.eventStream.publish(precv)
wsClient.expectMessage(expectedSerializedPrecv)

val pset = PaymentSettlingOnChain(amount = MilliSatoshi(21), paymentHash = ByteVector32.One, timestamp = 1553785442676L)
val expectedSerializedPset = """{"type":"PaymentSettlingOnChain","amount":21,"paymentHash":"0100000000000000000000000000000000000000000000000000000000000000","timestamp":1553785442676}"""
Serialization.write(pset)(mockService.formatsWithTypeHint) === expectedSerializedPset
system.eventStream.publish(pset)
wsClient.expectMessage(expectedSerializedPset)
}


}

private def matchTestJson(apiName: String, response: String) = {
val resource = getClass.getResourceAsStream(s"/api/$apiName")
val expectedResponse = Try(Source.fromInputStream(resource).mkString).getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ package fr.acinq.eclair.api

import java.net.InetAddress

import fr.acinq.bitcoin.{MilliSatoshi, OutPoint}
import fr.acinq.eclair._
import fr.acinq.eclair.payment.{PaymentRequest, PaymentSettlingOnChain}
import fr.acinq.bitcoin.{ByteVector32, OutPoint}
import fr.acinq.eclair.payment.PaymentRequest
import fr.acinq.eclair.transactions.{IN, OUT}
import fr.acinq.eclair.wire.{NodeAddress, Tor2, Tor3}
import org.json4s.jackson.Serialization
import org.json4s.{DefaultFormats, ShortTypeHints}
import org.scalatest.{FunSuite, Matchers}
import scodec.bits._

Expand Down Expand Up @@ -71,4 +75,10 @@ class JsonSerializersSpec extends FunSuite with Matchers {
val pr = PaymentRequest.read(ref)
Serialization.write(pr)(org.json4s.DefaultFormats + new PaymentRequestSerializer) shouldBe """{"prefix":"lnbc","amount":250000000,"timestamp":1496314658,"nodeId":"03e7156ae33b0a208d0744199163177e909e80176e55d97a2f221ede0f934dd9ad","description":"1 cup coffee","paymentHash":"0001020304050607080900010203040506070809000102030405060708090102","expiry":60,"minFinalCltvExpiry":null}"""
}

test("type hints") {
implicit val formats = DefaultFormats.withTypeHintFieldName("type") + ShortTypeHints(List(classOf[PaymentSettlingOnChain])) + new MilliSatoshiSerializer
val e1 = PaymentSettlingOnChain(MilliSatoshi(42), randomBytes32)
assert(Serialization.writePretty(e1).contains("\"type\" : \"PaymentSettlingOnChain\""))
}
}