Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send events when HTLCs settle on-chain #884

Merged
merged 18 commits into from
Apr 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import fr.acinq.eclair.transactions.Transactions.{InputInfo, TransactionWithInpu
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{ShortChannelId, UInt64}
import org.json4s.JsonAST._
import org.json4s.{CustomKeySerializer, CustomSerializer, jackson}
import org.json4s.{CustomKeySerializer, CustomSerializer, TypeHints, jackson}
import scodec.bits.ByteVector

/**
Expand Down Expand Up @@ -186,4 +186,15 @@ object JsonSupport extends Json4sSupport {

implicit val shouldWritePretty: ShouldWritePretty = ShouldWritePretty.True

case class CustomTypeHints(custom: Map[Class[_], String]) extends TypeHints {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be put in a separate file?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it belongs here logically, it's where we define our application-wide JsonSupport and all the custom serializers.

val reverse: Map[String, Class[_]] = custom.map(_.swap)

override val hints: List[Class[_]] = custom.keys.toList
override def hintFor(clazz: Class[_]): String = custom.getOrElse(clazz, {
throw new IllegalArgumentException(s"No type hint mapping found for $clazz")
})
override def classFor(hint: String): Option[Class[_]] = reverse.get(hint)
}


}
43 changes: 39 additions & 4 deletions eclair-core/src/main/scala/fr/acinq/eclair/api/Service.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2018 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.api

import akka.http.scaladsl.server._
Expand All @@ -15,9 +31,13 @@ import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.http.scaladsl.server.directives.{Credentials, LoggingMagnet}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, Source}
import fr.acinq.eclair.api.JsonSupport.CustomTypeHints
import fr.acinq.eclair.io.NodeURI
import fr.acinq.eclair.payment.{PaymentLifecycle, PaymentReceived, PaymentRequest}
import fr.acinq.eclair.payment.PaymentLifecycle.PaymentFailed
import fr.acinq.eclair.payment._
import grizzled.slf4j.Logging
import org.json4s.{ShortTypeHints, TypeHints}
import org.json4s.jackson.Serialization
import scodec.bits.ByteVector

import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -31,6 +51,15 @@ trait Service extends Directives with Logging {
import JsonSupport.marshaller
import JsonSupport.formats
import JsonSupport.serialization
// used to send typed messages over the websocket
val formatsWithTypeHint = formats.withTypeHintFieldName("type") +
CustomTypeHints(Map(
classOf[PaymentSent] -> "payment-sent",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

classOf[PaymentRelayed] -> "payment-relayed",
classOf[PaymentReceived] -> "payment-received",
classOf[PaymentSettlingOnChain] -> "payment-settling-onchain",
classOf[PaymentFailed] -> "payment-failed"
))

def password: String

Expand Down Expand Up @@ -65,13 +94,19 @@ trait Service extends Directives with Logging {
// create a flow transforming a queue of string -> string
val (flowInput, flowOutput) = Source.queue[String](10, OverflowStrategy.dropTail).toMat(BroadcastHub.sink[String])(Keep.both).run()

// register an actor that feeds the queue when a payment is received
// register an actor that feeds the queue on payment related events
actorSystem.actorOf(Props(new Actor {
override def preStart: Unit = context.system.eventStream.subscribe(self, classOf[PaymentReceived])

override def preStart: Unit = {
context.system.eventStream.subscribe(self, classOf[PaymentFailed])
context.system.eventStream.subscribe(self, classOf[PaymentEvent])
}

def receive: Receive = {
case received: PaymentReceived => flowInput.offer(received.paymentHash.toString)
case message: PaymentFailed => flowInput.offer(Serialization.write(message)(formatsWithTypeHint))
case message: PaymentEvent => flowInput.offer(Serialization.write(message)(formatsWithTypeHint))
}

}))

Flow[Message]
Expand Down
15 changes: 10 additions & 5 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1236,7 +1236,7 @@ class Channel(val nodeParams: NodeParams, wallet: EclairWallet, remoteNodeId: Pu
}
}
// we also need to fail outgoing htlcs that we know will never reach the blockchain
val overridenHtlcs = Closing.overriddenHtlcs(d.commitments.localCommit, d.commitments.remoteCommit, d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit), tx)
val overridenHtlcs = Closing.overriddenOutgoingHtlcs(d.commitments.localCommit, d.commitments.remoteCommit, d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit), tx)
overridenHtlcs.foreach { add =>
d.commitments.originChannels.get(add.id) match {
case Some(origin) =>
Expand All @@ -1247,15 +1247,20 @@ class Channel(val nodeParams: NodeParams, wallet: EclairWallet, remoteNodeId: Pu
log.info(s"cannot fail overriden htlc #${add.id} paymentHash=${add.paymentHash} (origin not found)")
}
}
// for our outgoing payments, let's send events if we know that they will settle on chain
Closing
.onchainOutgoingHtlcs(d.commitments.localCommit, d.commitments.remoteCommit, d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit), tx)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sstone can you double check this? this is quite sensitive

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes LGTM

.filter(add => Closing.isSentByLocal(add.id, d.commitments.originChannels)) // we only care about htlcs for which we were the original sender here
.foreach(add => context.system.eventStream.publish(PaymentSettlingOnChain(amount = MilliSatoshi(add.amountMsat), add.paymentHash)))
// then let's see if any of the possible close scenarii can be considered done
val mutualCloseDone = d.mutualClosePublished.exists(_.txid == tx.txid) // this case is trivial, in a mutual close scenario we only need to make sure that one of the closing txes is confirmed
val localCommitDone = localCommitPublished1.map(Closing.isLocalCommitDone(_)).getOrElse(false)
val localCommitDone = localCommitPublished1.map(Closing.isLocalCommitDone(_)).getOrElse(false)
val remoteCommitDone = remoteCommitPublished1.map(Closing.isRemoteCommitDone(_)).getOrElse(false)
val nextRemoteCommitDone = nextRemoteCommitPublished1.map(Closing.isRemoteCommitDone(_)).getOrElse(false)
val futureRemoteCommitDone = futureRemoteCommitPublished1.map(Closing.isRemoteCommitDone(_)).getOrElse(false)
val revokedCommitDone = revokedCommitPublished1.map(Closing.isRevokedCommitDone(_)).exists(_ == true) // we only need one revoked commit done
// finally, if one of the unilateral closes is done, we move to CLOSED state, otherwise we stay (note that we don't store the state)
val d1 = d.copy(localCommitPublished = localCommitPublished1, remoteCommitPublished = remoteCommitPublished1, nextRemoteCommitPublished = nextRemoteCommitPublished1, futureRemoteCommitPublished = futureRemoteCommitPublished1, revokedCommitPublished = revokedCommitPublished1)
// finally, if one of the unilateral closes is done, we move to CLOSED state, otherwise we stay (note that we don't store the state)
val d1 = d.copy(localCommitPublished = localCommitPublished1, remoteCommitPublished = remoteCommitPublished1, nextRemoteCommitPublished = nextRemoteCommitPublished1, futureRemoteCommitPublished = futureRemoteCommitPublished1, revokedCommitPublished = revokedCommitPublished1)
// we also send events related to fee
Closing.networkFeePaid(tx, d1) map { case (fee, desc) => feePaid(fee, tx, desc, d.channelId) }
val closeType_opt = if (mutualCloseDone) {
Expand Down Expand Up @@ -1609,7 +1614,7 @@ class Channel(val nodeParams: NodeParams, wallet: EclairWallet, remoteNodeId: Pu
}

(state, nextState, stateData, nextStateData) match {
// ORDER MATTERS!
// ORDER MATTERS!
case (_, _, d1: DATA_NORMAL, d2: DATA_NORMAL) if d1.channelUpdate == d2.channelUpdate && d1.channelAnnouncement == d2.channelAnnouncement =>
// don't do anything if neither the channel_update nor the channel_announcement didn't change
()
Expand Down
44 changes: 39 additions & 5 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import fr.acinq.bitcoin.{OutPoint, _}
import fr.acinq.eclair.blockchain.EclairWallet
import fr.acinq.eclair.crypto.{Generators, KeyManager}
import fr.acinq.eclair.db.ChannelsDb
import fr.acinq.eclair.payment.{Local, Origin}
import fr.acinq.eclair.transactions.Scripts._
import fr.acinq.eclair.transactions.Transactions._
import fr.acinq.eclair.transactions._
Expand Down Expand Up @@ -262,9 +263,9 @@ object Helpers {
*
* @param now current timet
* @param waitingSince we have been waiting since that time
* @param delay the nominal delay that we were supposed to wait
* @param minDelay the minimum delay even if the nominal one has expired
* @return the delay we will actually wait
* @param delay the nominal delay that we were supposed to wait
* @param minDelay the minimum delay even if the nominal one has expired
* @return the delay we will actually wait
*/
def computeFundingTimeout(now: Long, waitingSince: Long, delay: FiniteDuration, minDelay: FiniteDuration): FiniteDuration = {
import scala.concurrent.duration._
Expand Down Expand Up @@ -802,6 +803,39 @@ object Helpers {
}).toSet.flatten
}

/**
* Tells if we were the origin of this outgoing htlc
*
* @param htlcId
* @param originChannels
* @return
*/
def isSentByLocal(htlcId: Long, originChannels: Map[Long, Origin]) = originChannels.get(htlcId) match {
case Some(Local(_)) => true
case _ => false
}

/**
* As soon as a local or remote commitment reaches min_depth, we know which htlcs will be settled on-chain (whether
* or not they actually have an output in the commitment tx).
*
* @param localCommit
* @param remoteCommit
* @param nextRemoteCommit_opt
* @param tx a transaction that is sufficiently buried in the blockchain
*/
def onchainOutgoingHtlcs(localCommit: LocalCommit, remoteCommit: RemoteCommit, nextRemoteCommit_opt: Option[RemoteCommit], tx: Transaction): Set[UpdateAddHtlc] = {
if (localCommit.publishableTxs.commitTx.tx.txid == tx.txid) {
localCommit.spec.htlcs.filter(_.direction == OUT).map(_.add)
} else if (remoteCommit.txid == tx.txid) {
remoteCommit.spec.htlcs.filter(_.direction == IN).map(_.add)
} else if (nextRemoteCommit_opt.map(_.txid) == Some(tx.txid)) {
nextRemoteCommit_opt.get.spec.htlcs.filter(_.direction == IN).map(_.add)
} else {
Set.empty
}
}

/**
* If a local commitment tx reaches min_depth, we need to fail the outgoing htlcs that only us had signed, because
* they will never reach the blockchain.
Expand All @@ -814,7 +848,7 @@ object Helpers {
* @param log
* @return
*/
def overriddenHtlcs(localCommit: LocalCommit, remoteCommit: RemoteCommit, nextRemoteCommit_opt: Option[RemoteCommit], tx: Transaction)(implicit log: LoggingAdapter): Set[UpdateAddHtlc] =
def overriddenOutgoingHtlcs(localCommit: LocalCommit, remoteCommit: RemoteCommit, nextRemoteCommit_opt: Option[RemoteCommit], tx: Transaction)(implicit log: LoggingAdapter): Set[UpdateAddHtlc] =
if (localCommit.publishableTxs.commitTx.tx.txid == tx.txid) {
// our commit got confirmed, so any htlc that we signed but they didn't sign will never reach the chain
val mostRecentRemoteCommit = nextRemoteCommit_opt.getOrElse(remoteCommit)
Expand Down Expand Up @@ -1053,4 +1087,4 @@ object Helpers {
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ class LocalPaymentHandler(nodeParams: NodeParams) extends Actor with ActorLoggin
case PurgeExpiredRequests =>
context.become(run(hash2preimage.filterNot { case (_, pr) => hasExpired(pr) }))

case ReceivePayment(amount_opt, desc, expirySeconds_opt, extraHops) =>
case ReceivePayment(amount_opt, desc, expirySeconds_opt, extraHops, fallbackAddress_opt) =>
Try {
if (hash2preimage.size > nodeParams.maxPendingPaymentRequests) {
throw new RuntimeException(s"too many pending payment requests (max=${nodeParams.maxPendingPaymentRequests})")
}
val paymentPreimage = randomBytes32
val paymentHash = Crypto.sha256(paymentPreimage)
val expirySeconds = expirySeconds_opt.getOrElse(nodeParams.paymentRequestExpiry.toSeconds)
val paymentRequest = PaymentRequest(nodeParams.chainHash, amount_opt, paymentHash, nodeParams.privateKey, desc, fallbackAddress = None, expirySeconds = Some(expirySeconds), extraHops = extraHops)
val paymentRequest = PaymentRequest(nodeParams.chainHash, amount_opt, paymentHash, nodeParams.privateKey, desc, fallbackAddress_opt, expirySeconds = Some(expirySeconds), extraHops = extraHops)
log.debug(s"generated payment request=${PaymentRequest.write(paymentRequest)} from amount=$amount_opt")
sender ! paymentRequest
context.become(run(hash2preimage + (paymentHash -> PendingPaymentRequest(paymentPreimage, paymentRequest))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ case class PaymentSent(amount: MilliSatoshi, feesPaid: MilliSatoshi, paymentHash
case class PaymentRelayed(amountIn: MilliSatoshi, amountOut: MilliSatoshi, paymentHash: ByteVector32, fromChannelId: ByteVector32, toChannelId: ByteVector32, timestamp: Long = Platform.currentTime) extends PaymentEvent

case class PaymentReceived(amount: MilliSatoshi, paymentHash: ByteVector32, fromChannelId: ByteVector32, timestamp: Long = Platform.currentTime) extends PaymentEvent

case class PaymentSettlingOnChain(amount: MilliSatoshi, paymentHash: ByteVector32, timestamp: Long = Platform.currentTime) extends PaymentEvent
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ object PaymentLifecycle {
def props(sourceNodeId: PublicKey, router: ActorRef, register: ActorRef) = Props(classOf[PaymentLifecycle], sourceNodeId, router, register)

// @formatter:off
case class ReceivePayment(amountMsat_opt: Option[MilliSatoshi], description: String, expirySeconds_opt: Option[Long] = None, extraHops: List[List[ExtraHop]] = Nil)
case class ReceivePayment(amountMsat_opt: Option[MilliSatoshi], description: String, expirySeconds_opt: Option[Long] = None, extraHops: List[List[ExtraHop]] = Nil, fallbackAddress: Option[String] = None)
/**
* @param maxFeePct set by default to 3% as a safety measure (even if a route is found, if fee is higher than that payment won't be attempted)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,11 @@ object PaymentRequest {
f.version match {
case 17 if prefix == "lnbc" => Base58Check.encode(Base58.Prefix.PubkeyAddress, data)
case 18 if prefix == "lnbc" => Base58Check.encode(Base58.Prefix.ScriptAddress, data)
case 17 if prefix == "lntb" => Base58Check.encode(Base58.Prefix.PubkeyAddressTestnet, data)
case 18 if prefix == "lntb" => Base58Check.encode(Base58.Prefix.ScriptAddressTestnet, data)
case 17 if prefix == "lntb" || prefix == "lnbcrt" => Base58Check.encode(Base58.Prefix.PubkeyAddressTestnet, data)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sstone can you double check this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's correct because Base58 addresses on testnet and regtest have the same prefix, but not Bech32 addresses :(
But I think handling "lnbcrt" on a separate line looks a bit cleaner

case 18 if prefix == "lntb" || prefix == "lnbcrt" => Base58Check.encode(Base58.Prefix.ScriptAddressTestnet, data)
case version if prefix == "lnbc" => Bech32.encodeWitnessAddress("bc", version, data)
case version if prefix == "lntb" => Bech32.encodeWitnessAddress("tb", version, data)
case version if prefix == "lnbcrt" => Bech32.encodeWitnessAddress("bcrt", version, data)
}
}
}
Expand Down
Loading