Skip to content

Commit

Permalink
Add trampoline info to auditDB (#1767)
Browse files Browse the repository at this point in the history
* Add trampoline info to auditDB

Add a new table containing the recipient and amount sent to the recipient in case of trampoline relaying.
When using trampoline, the recipient may not be the next node on the path.
  • Loading branch information
thomash-acinq authored Apr 15, 2021
1 parent d0e79fa commit 15ddc17
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging {
.withTag(PaymentTags.Relay, PaymentTags.RelayType(e))
.record((e.amountIn - e.amountOut).truncateToSatoshi.toLong)
e match {
case TrampolinePaymentRelayed(_, incoming, outgoing, _) =>
case TrampolinePaymentRelayed(_, incoming, outgoing, _, _, _) =>
PaymentMetrics.PaymentParts.withTag(PaymentTags.Direction, PaymentTags.Directions.Received).record(incoming.length)
PaymentMetrics.PaymentParts.withTag(PaymentTags.Direction, PaymentTags.Directions.Sent).record(outgoing.length)
incoming.foreach(p => channelsDb.updateChannelMeta(p.channelId, ChannelEvent.EventType.PaymentReceived))
Expand Down
41 changes: 38 additions & 3 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.db._
import fr.acinq.eclair.payment._
import fr.acinq.eclair.transactions.Transactions.PlaceHolderPubKey
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong}
import grizzled.slf4j.Logging

import java.sql.Statement
import java.util.UUID
import javax.sql.DataSource
import scala.collection.immutable.Queue
Expand All @@ -38,18 +40,28 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
import ExtendedResultSet._

val DB_NAME = "audit"
val CURRENT_VERSION = 4
val CURRENT_VERSION = 5

case class RelayedPart(channelId: ByteVector32, amount: MilliSatoshi, direction: String, relayType: String, timestamp: Long)

inTransaction { pg =>
using(pg.createStatement()) { statement =>
def migration45(statement: Statement): Int = {
statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed_trampoline (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, next_node_id TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_trampoline_timestamp_idx ON relayed_trampoline(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_trampoline_payment_hash_idx ON relayed_trampoline(payment_hash)")
}

getVersion(statement, DB_NAME, CURRENT_VERSION) match {
case 4 =>
logger.warn(s"migrating db $DB_NAME, found version=4 current=$CURRENT_VERSION")
migration45(statement)
setVersion(statement, DB_NAME, CURRENT_VERSION)
case CURRENT_VERSION =>
statement.executeUpdate("CREATE TABLE IF NOT EXISTS sent (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, recipient_amount_msat BIGINT NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash TEXT NOT NULL, payment_preimage TEXT NOT NULL, recipient_node_id TEXT NOT NULL, to_channel_id TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS received (amount_msat BIGINT NOT NULL, payment_hash TEXT NOT NULL, from_channel_id TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, channel_id TEXT NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed_trampoline (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, next_node_id TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS network_fees (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, tx_id TEXT NOT NULL, fee_sat BIGINT NOT NULL, tx_type TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_events (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, capacity_sat BIGINT NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_errors (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal BOOLEAN NOT NULL, timestamp BIGINT NOT NULL)")
Expand All @@ -58,6 +70,8 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.executeUpdate("CREATE INDEX IF NOT EXISTS received_timestamp_idx ON received(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_timestamp_idx ON relayed(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_payment_hash_idx ON relayed(payment_hash)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_trampoline_timestamp_idx ON relayed_trampoline(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_trampoline_payment_hash_idx ON relayed_trampoline(payment_hash)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS network_fees_timestamp_idx ON network_fees(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_events_timestamp_idx ON channel_events(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_errors_timestamp_idx ON channel_errors(timestamp)")
Expand Down Expand Up @@ -124,7 +138,14 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
case ChannelPaymentRelayed(amountIn, amountOut, _, fromChannelId, toChannelId, ts) =>
// non-trampoline relayed payments have one input and one output
Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", ts), RelayedPart(toChannelId, amountOut, "OUT", "channel", ts))
case TrampolinePaymentRelayed(_, incoming, outgoing, ts) =>
case TrampolinePaymentRelayed(_, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, ts) =>
using(pg.prepareStatement("INSERT INTO relayed_trampoline VALUES (?, ?, ?, ?)")) { statement =>
statement.setString(1, e.paymentHash.toHex)
statement.setLong(2, nextTrampolineAmount.toLong)
statement.setString(3, nextTrampolineNodeId.value.toHex)
statement.setLong(4, e.timestamp)
statement.executeUpdate()
}
// trampoline relayed payments do MPP aggregation and may have M inputs and N outputs
incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", ts)) ++ outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", ts))
}
Expand Down Expand Up @@ -231,6 +252,18 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

override def listRelayed(from: Long, to: Long): Seq[PaymentRelayed] =
inTransaction { pg =>
var trampolineByHash = Map.empty[ByteVector32, (MilliSatoshi, PublicKey)]
using(pg.prepareStatement("SELECT * FROM relayed_trampoline WHERE timestamp >= ? AND timestamp < ?")) { statement =>
statement.setLong(1, from)
statement.setLong(2, to)
val rs = statement.executeQuery()
while (rs.next()) {
val paymentHash = rs.getByteVector32FromHex("payment_hash")
val amount = MilliSatoshi(rs.getLong("amount_msat"))
val nodeId = PublicKey(rs.getByteVectorFromHex("next_node_id"))
trampolineByHash += (paymentHash -> (amount, nodeId))
}
}
using(pg.prepareStatement("SELECT * FROM relayed WHERE timestamp >= ? AND timestamp < ? ORDER BY timestamp")) { statement =>
statement.setLong(1, from)
statement.setLong(2, to)
Expand All @@ -256,7 +289,9 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
case Some(RelayedPart(_, _, _, "channel", timestamp)) => incoming.zip(outgoing).map {
case (in, out) => ChannelPaymentRelayed(in.amount, out.amount, paymentHash, in.channelId, out.channelId, timestamp)
}
case Some(RelayedPart(_, _, _, "trampoline", timestamp)) => TrampolinePaymentRelayed(paymentHash, incoming, outgoing, timestamp) :: Nil
case Some(RelayedPart(_, _, _, "trampoline", timestamp)) =>
val (nextTrampolineAmount, nextTrampolineNodeId) = trampolineByHash.getOrElse(paymentHash, (0 msat, PlaceHolderPubKey))
TrampolinePaymentRelayed(paymentHash, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, timestamp) :: Nil
case _ => Nil
}
}.toSeq.sortBy(_.timestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.db._
import fr.acinq.eclair.payment._
import fr.acinq.eclair.transactions.Transactions.PlaceHolderPubKey
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong}
import grizzled.slf4j.Logging

Expand All @@ -38,7 +39,7 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
import ExtendedResultSet._

val DB_NAME = "audit"
val CURRENT_VERSION = 4
val CURRENT_VERSION = 5

case class RelayedPart(channelId: ByteVector32, amount: MilliSatoshi, direction: String, relayType: String, timestamp: Long)

Expand Down Expand Up @@ -75,26 +76,40 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_payment_hash_idx ON relayed(payment_hash)")
}

def migration45(statement: Statement): Int = {
statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed_trampoline (payment_hash BLOB NOT NULL, amount_msat INTEGER NOT NULL, next_node_id BLOB NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_trampoline_timestamp_idx ON relayed_trampoline(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_trampoline_payment_hash_idx ON relayed_trampoline(payment_hash)")
}

getVersion(statement, DB_NAME, CURRENT_VERSION) match {
case 1 => // previous version let's migrate
logger.warn(s"migrating db $DB_NAME, found version=1 current=$CURRENT_VERSION")
migration12(statement)
migration23(statement)
migration34(statement)
migration45(statement)
setVersion(statement, DB_NAME, CURRENT_VERSION)
case 2 =>
logger.warn(s"migrating db $DB_NAME, found version=2 current=$CURRENT_VERSION")
migration23(statement)
migration34(statement)
migration45(statement)
setVersion(statement, DB_NAME, CURRENT_VERSION)
case 3 =>
logger.warn(s"migrating db $DB_NAME, found version=3 current=$CURRENT_VERSION")
migration34(statement)
migration45(statement)
setVersion(statement, DB_NAME, CURRENT_VERSION)
case 4 =>
logger.warn(s"migrating db $DB_NAME, found version=4 current=$CURRENT_VERSION")
migration45(statement)
setVersion(statement, DB_NAME, CURRENT_VERSION)
case CURRENT_VERSION =>
statement.executeUpdate("CREATE TABLE IF NOT EXISTS sent (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, recipient_amount_msat INTEGER NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash BLOB NOT NULL, payment_preimage BLOB NOT NULL, recipient_node_id BLOB NOT NULL, to_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS received (amount_msat INTEGER NOT NULL, payment_hash BLOB NOT NULL, from_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (payment_hash BLOB NOT NULL, amount_msat INTEGER NOT NULL, channel_id BLOB NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed_trampoline (payment_hash BLOB NOT NULL, amount_msat INTEGER NOT NULL, next_node_id BLOB NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS network_fees (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, tx_id BLOB NOT NULL, fee_sat INTEGER NOT NULL, tx_type TEXT NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_events (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, capacity_sat INTEGER NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_errors (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal INTEGER NOT NULL, timestamp INTEGER NOT NULL)")
Expand All @@ -103,6 +118,8 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
statement.executeUpdate("CREATE INDEX IF NOT EXISTS received_timestamp_idx ON received(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_timestamp_idx ON relayed(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_payment_hash_idx ON relayed(payment_hash)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_trampoline_timestamp_idx ON relayed_trampoline(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_trampoline_payment_hash_idx ON relayed_trampoline(payment_hash)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS network_fees_timestamp_idx ON network_fees(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_events_timestamp_idx ON channel_events(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_errors_timestamp_idx ON channel_errors(timestamp)")
Expand Down Expand Up @@ -160,9 +177,17 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
case ChannelPaymentRelayed(amountIn, amountOut, _, fromChannelId, toChannelId, ts) =>
// non-trampoline relayed payments have one input and one output
Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", ts), RelayedPart(toChannelId, amountOut, "OUT", "channel", ts))
case TrampolinePaymentRelayed(_, incoming, outgoing, ts) =>
case TrampolinePaymentRelayed(_, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, ts) =>
using(sqlite.prepareStatement("INSERT INTO relayed_trampoline VALUES (?, ?, ?, ?)")) { statement =>
statement.setBytes(1, e.paymentHash.toArray)
statement.setLong(2, nextTrampolineAmount.toLong)
statement.setBytes(3, nextTrampolineNodeId.value.toArray)
statement.setLong(4, e.timestamp)
statement.executeUpdate()
}
// trampoline relayed payments do MPP aggregation and may have M inputs and N outputs
incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", ts)) ++ outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", ts))
incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", ts)) ++
outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", ts))
}
for (p <- payments) {
using(sqlite.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
Expand Down Expand Up @@ -256,7 +281,19 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
receivedByHash.values.toSeq.sortBy(_.timestamp)
}

override def listRelayed(from: Long, to: Long): Seq[PaymentRelayed] =
override def listRelayed(from: Long, to: Long): Seq[PaymentRelayed] = {
var trampolineByHash = Map.empty[ByteVector32, (MilliSatoshi, PublicKey)]
using(sqlite.prepareStatement("SELECT * FROM relayed_trampoline WHERE timestamp >= ? AND timestamp < ?")) { statement =>
statement.setLong(1, from)
statement.setLong(2, to)
val rs = statement.executeQuery()
while (rs.next()) {
val paymentHash = rs.getByteVector32("payment_hash")
val amount = MilliSatoshi(rs.getLong("amount_msat"))
val nodeId = PublicKey(rs.getByteVector("next_node_id"))
trampolineByHash += (paymentHash -> (amount, nodeId))
}
}
using(sqlite.prepareStatement("SELECT * FROM relayed WHERE timestamp >= ? AND timestamp < ?")) { statement =>
statement.setLong(1, from)
statement.setLong(2, to)
Expand All @@ -282,11 +319,14 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
case Some(RelayedPart(_, _, _, "channel", timestamp)) => incoming.zip(outgoing).map {
case (in, out) => ChannelPaymentRelayed(in.amount, out.amount, paymentHash, in.channelId, out.channelId, timestamp)
}
case Some(RelayedPart(_, _, _, "trampoline", timestamp)) => TrampolinePaymentRelayed(paymentHash, incoming, outgoing, timestamp) :: Nil
case Some(RelayedPart(_, _, _, "trampoline", timestamp)) =>
val (nextTrampolineAmount, nextTrampolineNodeId) = trampolineByHash.getOrElse(paymentHash, (0 msat, PlaceHolderPubKey))
TrampolinePaymentRelayed(paymentHash, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, timestamp) :: Nil
case _ => Nil
}
}.toSeq.sortBy(_.timestamp)
}
}

override def listNetworkFees(from: Long, to: Long): Seq[NetworkFee] =
using(sqlite.prepareStatement("SELECT * FROM network_fees WHERE timestamp >= ? AND timestamp < ? ORDER BY timestamp")) { statement =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ sealed trait PaymentRelayed extends PaymentEvent {

case class ChannelPaymentRelayed(amountIn: MilliSatoshi, amountOut: MilliSatoshi, paymentHash: ByteVector32, fromChannelId: ByteVector32, toChannelId: ByteVector32, timestamp: Long = System.currentTimeMillis) extends PaymentRelayed

case class TrampolinePaymentRelayed(paymentHash: ByteVector32, incoming: PaymentRelayed.Incoming, outgoing: PaymentRelayed.Outgoing, timestamp: Long = System.currentTimeMillis) extends PaymentRelayed {
case class TrampolinePaymentRelayed(paymentHash: ByteVector32, incoming: PaymentRelayed.Incoming, outgoing: PaymentRelayed.Outgoing, nextTrampolineNodeId: PublicKey, nextTrampolineAmount: MilliSatoshi, timestamp: Long = System.currentTimeMillis) extends PaymentRelayed {
override val amountIn: MilliSatoshi = incoming.map(_.amount).sum
override val amountOut: MilliSatoshi = outgoing.map(_.amount).sum
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ class NodeRelay private(nodeParams: NodeParams,
}
val incoming = upstream.adds.map(add => PaymentRelayed.Part(add.amountMsat, add.channelId))
val outgoing = paymentSent.parts.map(part => PaymentRelayed.Part(part.amountWithFees, part.toChannelId))
context.system.eventStream ! EventStream.Publish(TrampolinePaymentRelayed(paymentHash, incoming, outgoing))
context.system.eventStream ! EventStream.Publish(TrampolinePaymentRelayed(paymentHash, incoming, outgoing, paymentSent.recipientNodeId, paymentSent.recipientAmount))
}

}
Loading

0 comments on commit 15ddc17

Please sign in to comment.