Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add singleton actor AsyncPaymentTriggerer to monitor when the receiver of an async payment reconnects #2491

Merged
merged 9 commits into from
Jan 2, 2023
6 changes: 4 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import fr.acinq.eclair.db.{Databases, DbEventHandler, FileBackupHandler}
import fr.acinq.eclair.io.{ClientSpawner, Peer, Server, Switchboard}
import fr.acinq.eclair.message.Postman
import fr.acinq.eclair.payment.receive.PaymentHandler
import fr.acinq.eclair.payment.relay.Relayer
import fr.acinq.eclair.payment.relay.{AsyncPaymentTriggerer, Relayer}
import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator}
import fr.acinq.eclair.router._
import fr.acinq.eclair.tor.{Controller, TorProtocolHandler}
Expand Down Expand Up @@ -296,7 +296,8 @@ class Setup(val datadir: File,
dbEventHandler = system.actorOf(SimpleSupervisor.props(DbEventHandler.props(nodeParams), "db-event-handler", SupervisorStrategy.Resume))
register = system.actorOf(SimpleSupervisor.props(Register.props(), "register", SupervisorStrategy.Resume))
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register), "payment-handler", SupervisorStrategy.Resume))
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "peer-watcher")
t-bast marked this conversation as resolved.
Show resolved Hide resolved
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, triggerer, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
// Before initializing the switchboard (which re-connects us to the network) and the user-facing parts of the system,
// we want to make sure the handler for post-restart broken HTLCs has finished initializing.
_ <- postRestartCleanUpInitialized.future
Expand All @@ -311,6 +312,7 @@ class Setup(val datadir: File,
paymentInitiator = system.actorOf(SimpleSupervisor.props(PaymentInitiator.props(nodeParams, PaymentInitiator.SimplePaymentFactory(nodeParams, router, register)), "payment-initiator", SupervisorStrategy.Restart))
_ = for (i <- 0 until config.getInt("autoprobe-count")) yield system.actorOf(SimpleSupervisor.props(Autoprobe.props(nodeParams, router, paymentInitiator), s"payment-autoprobe-$i", SupervisorStrategy.Restart))

_ = triggerer ! AsyncPaymentTriggerer.Start(switchboard.toTyped)
balanceActor = system.spawn(BalanceActor(nodeParams.db, bitcoinClient, channelsListener, nodeParams.balanceCheckInterval), name = "balance-actor")

postman = system.spawn(Behaviors.supervise(Postman(switchboard.toTyped)).onFailure(typed.SupervisorStrategy.restart), name = "postman")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright 2022 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.payment.relay

import akka.actor.typed.ActorRef.ActorRefOps
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair.blockchain.CurrentBlockHeight
import fr.acinq.eclair.io.PeerReadyNotifier.NotifyWhenPeerReady
import fr.acinq.eclair.io.{PeerReadyNotifier, Switchboard}
import fr.acinq.eclair.payment.relay.AsyncPaymentTriggerer.Command
import fr.acinq.eclair.{BlockHeight, Logs}

/**
* This actor waits for an async payment receiver to become ready to receive a payment or for a block timeout to expire.
* If the receiver of the payment is a connected peer, spawn a PeerReadyNotifier actor.
*/
object AsyncPaymentTriggerer {
// @formatter:off
sealed trait Command
case class Start(switchboard: ActorRef[Switchboard.GetPeerInfo]) extends Command
case class Watch(replyTo: ActorRef[Result], remoteNodeId: PublicKey, paymentHash: ByteVector32, timeout: BlockHeight) extends Command
case class Cancel(paymentHash: ByteVector32) extends Command
private[relay] case class NotifierStopped(remoteNodeId: PublicKey) extends Command
private case class WrappedPeerReadyResult(result: PeerReadyNotifier.Result) extends Command
private case class WrappedCurrentBlockHeight(currentBlockHeight: CurrentBlockHeight) extends Command

sealed trait Result
case object AsyncPaymentTriggered extends Result
case object AsyncPaymentTimeout extends Result
case object AsyncPaymentCanceled extends Result
// @formatter:on

def apply(): Behavior[Command] = Behaviors.setup { context =>
Behaviors.withMdc(Logs.mdc(category_opt = Some(LogCategory.PAYMENT))) {
Behaviors.receiveMessagePartial {
case Start(switchboard) => new AsyncPaymentTriggerer(switchboard, context).start()
}
}
}
}

private class AsyncPaymentTriggerer(switchboard: ActorRef[Switchboard.GetPeerInfo], context: ActorContext[Command]) {

import AsyncPaymentTriggerer._

case class Payment(replyTo: ActorRef[Result], timeout: BlockHeight, paymentHash: ByteVector32) {
def expired(currentBlockHeight: BlockHeight): Boolean = timeout <= currentBlockHeight
}

case class PeerPayments(notifier: ActorRef[PeerReadyNotifier.Command], pendingPayments: Set[Payment]) {
def update(currentBlockHeight: BlockHeight): Option[PeerPayments] = {
val expiredPayments = pendingPayments.filter(_.expired(currentBlockHeight))
expiredPayments.foreach(e => e.replyTo ! AsyncPaymentTimeout)
updatePaymentsOrStop(pendingPayments.removedAll(expiredPayments))
}

def cancel(paymentHash: ByteVector32): Option[PeerPayments] = {
val canceledPayment = pendingPayments.find(_.paymentHash == paymentHash)
if (canceledPayment.isDefined) canceledPayment.get.replyTo ! AsyncPaymentCanceled
updatePaymentsOrStop(pendingPayments.removedAll(canceledPayment))
t-bast marked this conversation as resolved.
Show resolved Hide resolved
}

private def updatePaymentsOrStop(pendingPayments: Set[Payment]): Option[PeerPayments] = {
if (pendingPayments.isEmpty) {
context.stop(notifier)
None
} else {
Some(PeerPayments(notifier, pendingPayments))
}
}

def trigger(): Unit = pendingPayments.foreach(e => e.replyTo ! AsyncPaymentTriggered)
def timeout(): Unit = pendingPayments.foreach(e => e.replyTo ! AsyncPaymentTimeout)
}

def start(): Behavior[Command] = {
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[CurrentBlockHeight](WrappedCurrentBlockHeight))
t-bast marked this conversation as resolved.
Show resolved Hide resolved
watching(Map.empty)
}

private def watching(peers: Map[PublicKey, PeerPayments]): Behavior[Command] = {
Behaviors.receiveMessagePartial {
case Watch(replyTo, remoteNodeId, paymentHash, timeout) =>
peers.get(remoteNodeId) match {
case None =>
val notifier = context.spawn(
Behaviors.supervise(PeerReadyNotifier(remoteNodeId, switchboard, None)).onFailure(SupervisorStrategy.stop),
s"peer-ready-notifier-$remoteNodeId",
)
t-bast marked this conversation as resolved.
Show resolved Hide resolved
context.watchWith(notifier, NotifierStopped(remoteNodeId))
notifier ! NotifyWhenPeerReady(context.messageAdapter[PeerReadyNotifier.Result](WrappedPeerReadyResult))
val peer = PeerPayments(notifier, Set(Payment(replyTo, timeout, paymentHash)))
watching(peers + (remoteNodeId -> peer))
case Some(peer) =>
val peer1 = PeerPayments(peer.notifier, peer.pendingPayments + Payment(replyTo, timeout, paymentHash))
watching(peers + (remoteNodeId -> peer1))
}
case Cancel(paymentHash) =>
val peers1 = peers.flatMap {
case (remoteNodeId, peer) => peer.cancel(paymentHash).map(peer1 => remoteNodeId -> peer1)
}
watching(peers1)
case WrappedCurrentBlockHeight(CurrentBlockHeight(currentBlockHeight)) =>
val peers1 = peers.flatMap {
case (remoteNodeId, peer) => peer.update(currentBlockHeight).map(peer1 => remoteNodeId -> peer1)
}
watching(peers1)
case WrappedPeerReadyResult(PeerReadyNotifier.PeerReady(remoteNodeId, _)) =>
t-bast marked this conversation as resolved.
Show resolved Hide resolved
// notify watcher that destination peer is ready to receive async payments; PeerReadyNotifier will stop itself
peers.get(remoteNodeId).foreach(_.trigger())
watching(peers - remoteNodeId)
case NotifierStopped(remoteNodeId) =>
peers.get(remoteNodeId) match {
case None => Behaviors.same
case Some(peer) =>
context.log.error(s"PeerReadyNotifier stopped unexpectedly while watching node $remoteNodeId.")
peer.timeout()
t-bast marked this conversation as resolved.
Show resolved Hide resolved
watching(peers - remoteNodeId)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@

package fr.acinq.eclair.payment.relay

import akka.actor.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.adapter.{TypedActorContextOps, TypedActorRefOps}
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.{ActorRef, typed}
import com.softwaremill.quicklens.ModifyPimp
import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.eclair.blockchain.CurrentBlockHeight
import fr.acinq.eclair.channel.{CMD_FAIL_HTLC, CMD_FULFILL_HTLC}
import fr.acinq.eclair.db.PendingCommandsDb
import fr.acinq.eclair.payment.IncomingPaymentPacket.NodeRelayPacket
Expand All @@ -40,7 +39,7 @@ import fr.acinq.eclair.router.Router.RouteParams
import fr.acinq.eclair.router.{BalanceTooLow, RouteNotFound}
import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{BlockHeight, CltvExpiry, Features, Logs, MilliSatoshi, NodeParams, UInt64, nodeFee, randomBytes32}
import fr.acinq.eclair.{CltvExpiry, Features, Logs, MilliSatoshi, NodeParams, UInt64, nodeFee, randomBytes32}

import java.util.UUID
import scala.collection.immutable.Queue
Expand All @@ -56,14 +55,13 @@ object NodeRelay {
case class Relay(nodeRelayPacket: IncomingPaymentPacket.NodeRelayPacket) extends Command
case object Stop extends Command
case object RelayAsyncPayment extends Command
case object CancelAsyncPayment extends Command
private case class WrappedMultiPartExtraPaymentReceived(mppExtraReceived: MultiPartPaymentFSM.ExtraPaymentReceived[HtlcPart]) extends Command
private case class WrappedMultiPartPaymentFailed(mppFailed: MultiPartPaymentFSM.MultiPartPaymentFailed) extends Command
private case class WrappedMultiPartPaymentSucceeded(mppSucceeded: MultiPartPaymentFSM.MultiPartPaymentSucceeded) extends Command
private case class WrappedPreimageReceived(preimageReceived: PreimageReceived) extends Command
private case class WrappedPaymentSent(paymentSent: PaymentSent) extends Command
private case class WrappedPaymentFailed(paymentFailed: PaymentFailed) extends Command
private case class WrappedCurrentBlockHeight(currentBlockHeight: BlockHeight) extends Command
private[relay] case class WrappedPeerReadyResult(result: AsyncPaymentTriggerer.Result) extends Command
// @formatter:on

trait OutgoingPaymentFactory {
Expand All @@ -87,7 +85,8 @@ object NodeRelay {
register: ActorRef,
relayId: UUID,
nodeRelayPacket: NodeRelayPacket,
outgoingPaymentFactory: OutgoingPaymentFactory): Behavior[Command] =
outgoingPaymentFactory: OutgoingPaymentFactory,
triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command]): Behavior[Command] =
Behaviors.setup { context =>
val paymentHash = nodeRelayPacket.add.paymentHash
val totalAmountIn = nodeRelayPacket.outerPayload.totalAmount
Expand All @@ -102,7 +101,7 @@ object NodeRelay {
context.messageAdapter[MultiPartPaymentFSM.MultiPartPaymentSucceeded](WrappedMultiPartPaymentSucceeded)
}.toClassic
val incomingPaymentHandler = context.actorOf(MultiPartPaymentFSM.props(nodeParams, paymentHash, totalAmountIn, mppFsmAdapters))
new NodeRelay(nodeParams, parent, register, relayId, paymentHash, nodeRelayPacket.outerPayload.paymentSecret, context, outgoingPaymentFactory)
new NodeRelay(nodeParams, parent, register, relayId, paymentHash, nodeRelayPacket.outerPayload.paymentSecret, context, outgoingPaymentFactory, triggerer)
.receiving(Queue.empty, nodeRelayPacket.innerPayload, nodeRelayPacket.nextPacket, incomingPaymentHandler)
}
}
Expand Down Expand Up @@ -170,7 +169,8 @@ class NodeRelay private(nodeParams: NodeParams,
paymentHash: ByteVector32,
paymentSecret: ByteVector32,
context: ActorContext[NodeRelay.Command],
outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory) {
outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory,
triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command]) {

import NodeRelay._

Expand Down Expand Up @@ -214,31 +214,24 @@ class NodeRelay private(nodeParams: NodeParams,

private def waitForTrigger(upstream: Upstream.Trampoline, nextPayload: IntermediatePayload.NodeRelay.Standard, nextPacket: OnionRoutingPacket): Behavior[Command] = {
context.log.info(s"waiting for async payment to trigger before relaying trampoline payment (amountIn=${upstream.amountIn} expiryIn=${upstream.expiryIn} amountOut=${nextPayload.amountToForward} expiryOut=${nextPayload.outgoingCltv}, asyncPaymentsParams=${nodeParams.relayParams.asyncPaymentsParams})")
// a trigger must be received before waiting more than `holdTimeoutBlocks`
val timeoutBlock: BlockHeight = nodeParams.currentBlockHeight + nodeParams.relayParams.asyncPaymentsParams.holdTimeoutBlocks
// a trigger must be received `cancelSafetyBeforeTimeoutBlocks` before the incoming payment cltv expiry
val safetyBlock: BlockHeight = (upstream.expiryIn - nodeParams.relayParams.asyncPaymentsParams.cancelSafetyBeforeTimeout).blockHeight
val messageAdapter = context.messageAdapter[CurrentBlockHeight](cbc => WrappedCurrentBlockHeight(cbc.blockHeight))
context.system.eventStream ! EventStream.Subscribe[CurrentBlockHeight](messageAdapter)
val timeoutBlock = nodeParams.currentBlockHeight + nodeParams.relayParams.asyncPaymentsParams.holdTimeoutBlocks
val safetyBlock = (upstream.expiryIn - nodeParams.relayParams.asyncPaymentsParams.cancelSafetyBeforeTimeout).blockHeight
// wait for notification until which ever occurs first: the hold timeout block or the safety block
val notifierTimeout = Seq(timeoutBlock, safetyBlock).min
val peerReadyResultAdapter = context.messageAdapter[AsyncPaymentTriggerer.Result](WrappedPeerReadyResult)

// TODO: send the WaitingToRelayPayment message to an actor that watches for the payment receiver to come back online before sending the RelayAsyncPayment message
triggerer ! AsyncPaymentTriggerer.Watch(peerReadyResultAdapter, nextPayload.outgoingNodeId, paymentHash, notifierTimeout)
context.system.eventStream ! EventStream.Publish(WaitingToRelayPayment(nextPayload.outgoingNodeId, paymentHash))
Behaviors.receiveMessagePartial {
case WrappedCurrentBlockHeight(blockHeight) if blockHeight >= safetyBlock =>
context.log.warn(s"rejecting async payment at block $blockHeight; was not triggered ${nodeParams.relayParams.asyncPaymentsParams.cancelSafetyBeforeTimeout} safety blocks before upstream cltv expiry at ${upstream.expiryIn}")
case WrappedPeerReadyResult(AsyncPaymentTriggerer.AsyncPaymentTimeout) =>
context.log.warn("rejecting async payment; was not triggered before block {}", notifierTimeout)
rejectPayment(upstream, Some(TemporaryNodeFailure)) // TODO: replace failure type when async payment spec is finalized
stopping()
case WrappedCurrentBlockHeight(blockHeight) if blockHeight >= timeoutBlock =>
context.log.warn(s"rejecting async payment at block $blockHeight; was not triggered after waiting ${nodeParams.relayParams.asyncPaymentsParams.holdTimeoutBlocks} blocks")
rejectPayment(upstream, Some(TemporaryNodeFailure)) // TODO: replace failure type when async payment spec is finalized
stopping()
case WrappedCurrentBlockHeight(_) =>
Behaviors.same
case CancelAsyncPayment =>
case WrappedPeerReadyResult(AsyncPaymentTriggerer.AsyncPaymentCanceled) =>
context.log.warn(s"payment sender canceled a waiting async payment")
rejectPayment(upstream, Some(TemporaryNodeFailure)) // TODO: replace failure type when async payment spec is finalized
stopping()
case RelayAsyncPayment =>
t-bast marked this conversation as resolved.
Show resolved Hide resolved
case WrappedPeerReadyResult(AsyncPaymentTriggerer.AsyncPaymentTriggered) =>
doSend(upstream, nextPayload, nextPacket)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package fr.acinq.eclair.payment.relay

import akka.actor.typed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.scalacompat.ByteVector32
Expand Down Expand Up @@ -56,7 +57,7 @@ object NodeRelayer {
* NB: the payment secret used here is different from the invoice's payment secret and ensures we can
* group together HTLCs that the previous trampoline node sent in the same MPP.
*/
def apply(nodeParams: NodeParams, register: akka.actor.ActorRef, outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory, children: Map[PaymentKey, ActorRef[NodeRelay.Command]] = Map.empty): Behavior[Command] =
def apply(nodeParams: NodeParams, register: akka.actor.ActorRef, outgoingPaymentFactory: NodeRelay.OutgoingPaymentFactory, triggerer: typed.ActorRef[AsyncPaymentTriggerer.Command], children: Map[PaymentKey, ActorRef[NodeRelay.Command]] = Map.empty): Behavior[Command] =
Behaviors.setup { context =>
Behaviors.withMdc(Logs.mdc(category_opt = Some(Logs.LogCategory.PAYMENT)), mdc) {
Behaviors.receiveMessage {
Expand All @@ -71,15 +72,15 @@ object NodeRelayer {
case None =>
val relayId = UUID.randomUUID()
context.log.debug(s"spawning a new handler with relayId=$relayId")
val handler = context.spawn(NodeRelay.apply(nodeParams, context.self, register, relayId, nodeRelayPacket, outgoingPaymentFactory), relayId.toString)
val handler = context.spawn(NodeRelay.apply(nodeParams, context.self, register, relayId, nodeRelayPacket, outgoingPaymentFactory, triggerer), relayId.toString)
context.log.debug("forwarding incoming htlc #{} from channel {} to new handler", htlcIn.id, htlcIn.channelId)
handler ! NodeRelay.Relay(nodeRelayPacket)
apply(nodeParams, register, outgoingPaymentFactory, children + (childKey -> handler))
apply(nodeParams, register, outgoingPaymentFactory, triggerer, children + (childKey -> handler))
}
case RelayComplete(childHandler, paymentHash, paymentSecret) =>
// we do a back-and-forth between parent and child before stopping the child to prevent a race condition
childHandler ! NodeRelay.Stop
apply(nodeParams, register, outgoingPaymentFactory, children - PaymentKey(paymentHash, paymentSecret))
apply(nodeParams, register, outgoingPaymentFactory, triggerer, children - PaymentKey(paymentHash, paymentSecret))
case GetPendingPayments(replyTo) =>
replyTo ! children
Behaviors.same
Expand Down
Loading