Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Postgres code #1743

Merged
merged 17 commits into from
Mar 31, 2021
2 changes: 1 addition & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class Setup(datadir: File,

logger.info(s"instanceid=$instanceId")

val databases = Databases.init(config.getConfig("db"), instanceId, datadir, chaindir, db)
val databases = Databases.init(config.getConfig("db"), instanceId, chaindir, db)

/**
* This counter holds the current blockchain height.
Expand Down
300 changes: 141 additions & 159 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,89 +16,146 @@

package fr.acinq.eclair.db

import java.io.File
import java.nio.file._
import java.sql.{Connection, DriverManager}
import java.util.UUID

import akka.actor.ActorSystem
import com.typesafe.config.Config
import fr.acinq.eclair.db.pg.PgUtils.LockType.LockType
import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import fr.acinq.eclair.db.pg.PgUtils.PgLock.LockFailureHandler
import fr.acinq.eclair.db.pg.PgUtils._
import fr.acinq.eclair.db.pg._
import fr.acinq.eclair.db.sqlite._
import grizzled.slf4j.Logging
import javax.sql.DataSource

import java.io.File
import java.nio.file._
import java.sql.{Connection, DriverManager}
import java.util.UUID
import scala.concurrent.duration._

trait Databases {
//@formatter:off
def network: NetworkDb
def audit: AuditDb
def channels: ChannelsDb
def peers: PeersDb
def payments: PaymentsDb
def pendingRelay: PendingRelayDb
//@formatter:on
}

val network: NetworkDb
object Databases extends Logging {

val audit: AuditDb
trait FileBackup {
this: Databases =>
def backup(backupFile: File): Unit
}

val channels: ChannelsDb
trait ExclusiveLock {
this: Databases =>
def obtainExclusiveLock(): Unit
}

val peers: PeersDb
case class SqliteDatabases private (network: SqliteNetworkDb,
audit: SqliteAuditDb,
channels: SqliteChannelsDb,
peers: SqlitePeersDb,
payments: SqlitePaymentsDb,
pendingRelay: SqlitePendingRelayDb,
private val backupConnection: Connection) extends Databases with FileBackup {
override def backup(backupFile: File): Unit = SqliteUtils.using(backupConnection.createStatement()) {
statement => {
statement.executeUpdate(s"backup to ${backupFile.getAbsolutePath}")
}
}
}

val payments: PaymentsDb
object SqliteDatabases {
def apply(auditJdbc: Connection, networkJdbc: Connection, eclairJdbc: Connection): Databases = SqliteDatabases(
network = new SqliteNetworkDb(networkJdbc),
audit = new SqliteAuditDb(auditJdbc),
channels = new SqliteChannelsDb(eclairJdbc),
peers = new SqlitePeersDb(eclairJdbc),
payments = new SqlitePaymentsDb(eclairJdbc),
pendingRelay = new SqlitePendingRelayDb(eclairJdbc),
backupConnection = eclairJdbc
)
}

val pendingRelay: PendingRelayDb
}
case class PostgresDatabases private (network: PgNetworkDb,
audit: PgAuditDb,
channels: PgChannelsDb,
peers: PgPeersDb,
payments: PgPaymentsDb,
pendingRelay: PgPendingRelayDb,
dataSource: HikariDataSource,
lock: PgLock) extends Databases with ExclusiveLock {
override def obtainExclusiveLock(): Unit = lock.obtainExclusiveLock(dataSource)
}

object Databases extends Logging {
object PostgresDatabases {
def apply(hikariConfig: HikariConfig,
instanceId: UUID,
lock: PgLock = PgLock.NoLock,
jdbcUrlFile_opt: Option[File])(implicit system: ActorSystem): PostgresDatabases = {

jdbcUrlFile_opt.foreach(jdbcUrlFile => checkIfDatabaseUrlIsUnchanged(hikariConfig.getJdbcUrl, jdbcUrlFile))

implicit val ds: HikariDataSource = new HikariDataSource(hikariConfig)
implicit val implicitLock: PgLock = lock

val databases = PostgresDatabases(
network = new PgNetworkDb,
audit = new PgAuditDb,
channels = new PgChannelsDb,
peers = new PgPeersDb,
payments = new PgPaymentsDb,
pendingRelay = new PgPendingRelayDb,
dataSource = ds,
lock = lock)

lock match {
case PgLock.NoLock => ()
case l: PgLock.LeaseLock =>
// we obtain a lock right now...
databases.obtainExclusiveLock()
// ...and renew the lease regularly
import system.dispatcher
system.scheduler.scheduleWithFixedDelay(l.leaseRenewInterval, l.leaseRenewInterval)(() => databases.obtainExclusiveLock())
}

trait FileBackup { this: Databases =>
def backup(backupFile: File): Unit
}
databases
}

trait ExclusiveLock { this: Databases =>
def obtainExclusiveLock(): Unit
private def checkIfDatabaseUrlIsUnchanged(url: String, urlFile: File): Unit = {
def readString(path: Path): String = Files.readAllLines(path).get(0)

def writeString(path: Path, string: String): Unit = Files.write(path, java.util.Arrays.asList(string))

if (urlFile.exists()) {
val oldUrl = readString(urlFile.toPath)
if (oldUrl != url)
throw JdbcUrlChanged(oldUrl, url)
} else {
writeString(urlFile.toPath, url)
}
}
}

def init(dbConfig: Config, instanceId: UUID, datadir: File, chaindir: File, db: Option[Databases] = None)(implicit system: ActorSystem): Databases = {
def init(dbConfig: Config, instanceId: UUID, chaindir: File, db: Option[Databases] = None)(implicit system: ActorSystem): Databases = {
db match {
case Some(d) => d
case None =>
dbConfig.getString("driver") match {
case "sqlite" => Databases.sqliteJDBC(chaindir)
case "postgres" =>
val pg = Databases.setupPgDatabases(dbConfig, instanceId, datadir, { ex =>
logger.error("fatal error: Cannot obtain lock on the database.\n", ex)
sys.exit(-2)
})
if (LockType(dbConfig.getString("postgres.lock-type")) == LockType.LEASE) {
val dbLockLeaseRenewInterval = dbConfig.getDuration("postgres.lease.renew-interval").toSeconds.seconds
val dbLockLeaseInterval = dbConfig.getDuration("postgres.lease.interval").toSeconds.seconds
if (dbLockLeaseInterval <= dbLockLeaseRenewInterval)
throw new RuntimeException("Invalid configuration: `db.postgres.lease.interval` must be greater than `db.postgres.lease.renew-interval`")
import system.dispatcher
system.scheduler.scheduleWithFixedDelay(dbLockLeaseRenewInterval, dbLockLeaseRenewInterval)(new Runnable {
override def run(): Unit = {
try {
pg.obtainExclusiveLock()
} catch {
case e: Throwable =>
logger.error("fatal error: Cannot obtain the database lease.\n", e)
sys.exit(-1)
}
}
})
}
pg
case driver => throw new RuntimeException(s"Unknown database driver `$driver`")
case "sqlite" => Databases.sqlite(chaindir)
case "postgres" => Databases.postgres(dbConfig, instanceId, chaindir)
case driver => throw new RuntimeException(s"unknown database driver `$driver`")
}
}
}

/**
* Given a parent folder it creates or loads all the databases from a JDBC connection
*
* @param dbdir
* @return
*/
def sqliteJDBC(dbdir: File): Databases = {
* Given a parent folder it creates or loads all the databases from a JDBC connection
*/
def sqlite(dbdir: File): Databases = {
dbdir.mkdir()
var sqliteEclair: Connection = null
var sqliteNetwork: Connection = null
Expand All @@ -109,127 +166,52 @@ object Databases extends Logging {
sqliteAudit = DriverManager.getConnection(s"jdbc:sqlite:${new File(dbdir, "audit.sqlite")}")
SqliteUtils.obtainExclusiveLock(sqliteEclair) // there should only be one process writing to this file
logger.info("successful lock on eclair.sqlite")
sqliteDatabaseByConnections(sqliteAudit, sqliteNetwork, sqliteEclair)
SqliteDatabases(sqliteAudit, sqliteNetwork, sqliteEclair)
} catch {
case t: Throwable => {
case t: Throwable =>
logger.error("could not create connection to sqlite databases: ", t)
if (sqliteEclair != null) sqliteEclair.close()
if (sqliteNetwork != null) sqliteNetwork.close()
if (sqliteAudit != null) sqliteAudit.close()
throw t
}
}
}

def postgresJDBC(database: String, host: String, port: Int,
username: Option[String], password: Option[String],
poolProperties: Map[String, Long],
instanceId: UUID,
databaseLeaseInterval: FiniteDuration,
lockExceptionHandler: LockExceptionHandler = { _ => () },
lockType: LockType = LockType.NONE, datadir: File): Databases with ExclusiveLock = {
val url = s"jdbc:postgresql://${host}:${port}/${database}"

checkIfDatabaseUrlIsUnchanged(url, datadir)

implicit val lock: DatabaseLock = lockType match {
case LockType.NONE => NoLock
case LockType.LEASE => LeaseLock(instanceId, databaseLeaseInterval, lockExceptionHandler)
case _ => throw new RuntimeException(s"Unknown postgres lock type: `$lockType`")
}

import com.zaxxer.hikari.{HikariConfig, HikariDataSource}

val config = new HikariConfig()
config.setJdbcUrl(url)
username.foreach(config.setUsername)
password.foreach(config.setPassword)
poolProperties.get("max-size").foreach(x => config.setMaximumPoolSize(x.toInt))
poolProperties.get("connection-timeout").foreach(config.setConnectionTimeout)
poolProperties.get("idle-timeout").foreach(config.setIdleTimeout)
poolProperties.get("max-life-time").foreach(config.setMaxLifetime)

implicit val ds: DataSource = new HikariDataSource(config)

val databases = new Databases with ExclusiveLock {
override val network = new PgNetworkDb
override val audit = new PgAuditDb
override val channels = new PgChannelsDb
override val peers = new PgPeersDb
override val payments = new PgPaymentsDb
override val pendingRelay = new PgPendingRelayDb
override def obtainExclusiveLock(): Unit = lock.obtainExclusiveLock
}
databases.obtainExclusiveLock()
databases
}

def sqliteDatabaseByConnections(auditJdbc: Connection, networkJdbc: Connection, eclairJdbc: Connection): Databases = new Databases with FileBackup {
override val network = new SqliteNetworkDb(networkJdbc)
override val audit = new SqliteAuditDb(auditJdbc)
override val channels = new SqliteChannelsDb(eclairJdbc)
override val peers = new SqlitePeersDb(eclairJdbc)
override val payments = new SqlitePaymentsDb(eclairJdbc)
override val pendingRelay = new SqlitePendingRelayDb(eclairJdbc)
override def backup(backupFile: File): Unit = {

SqliteUtils.using(eclairJdbc.createStatement()) {
statement => {
statement.executeUpdate(s"backup to ${backupFile.getAbsolutePath}")
}
}

}
}

def setupPgDatabases(dbConfig: Config, instanceId: UUID, datadir: File, lockExceptionHandler: LockExceptionHandler): Databases with ExclusiveLock = {
def postgres(dbConfig: Config, instanceId: UUID, dbdir: File, lockExceptionHandler: LockFailureHandler = LockFailureHandler.logAndStop)(implicit system: ActorSystem): PostgresDatabases = {
val database = dbConfig.getString("postgres.database")
val host = dbConfig.getString("postgres.host")
val port = dbConfig.getInt("postgres.port")
val username = if (dbConfig.getIsNull("postgres.username") || dbConfig.getString("postgres.username").isEmpty)
None
else
Some(dbConfig.getString("postgres.username"))
val password = if (dbConfig.getIsNull("postgres.password") || dbConfig.getString("postgres.password").isEmpty)
None
else
Some(dbConfig.getString("postgres.password"))
val properties = {
val poolConfig = dbConfig.getConfig("postgres.pool")
Map.empty
.updated("max-size", poolConfig.getInt("max-size").toLong)
.updated("connection-timeout", poolConfig.getDuration("connection-timeout").toMillis)
.updated("idle-timeout", poolConfig.getDuration("idle-timeout").toMillis)
.updated("max-life-time", poolConfig.getDuration("max-life-time").toMillis)

val username = if (dbConfig.getIsNull("postgres.username") || dbConfig.getString("postgres.username").isEmpty) None else Some(dbConfig.getString("postgres.username"))
val password = if (dbConfig.getIsNull("postgres.password") || dbConfig.getString("postgres.password").isEmpty) None else Some(dbConfig.getString("postgres.password"))

val hikariConfig = new HikariConfig()
hikariConfig.setJdbcUrl(s"jdbc:postgresql://$host:$port/$database")
username.foreach(hikariConfig.setUsername)
password.foreach(hikariConfig.setPassword)
val poolConfig = dbConfig.getConfig("postgres.pool")
hikariConfig.setMaximumPoolSize(poolConfig.getInt("max-size"))
hikariConfig.setConnectionTimeout(poolConfig.getDuration("connection-timeout").toMillis)
hikariConfig.setIdleTimeout(poolConfig.getDuration("idle-timeout").toMillis)
hikariConfig.setMaxLifetime(poolConfig.getDuration("max-life-time").toMillis)

val lock = dbConfig.getString("postgres.lock-type") match {
case "none" => PgLock.NoLock
case "lease" =>
val leaseInterval = dbConfig.getDuration("postgres.lease.interval").toSeconds.seconds
val leaseRenewInterval = dbConfig.getDuration("postgres.lease.renew-interval").toSeconds.seconds
require(leaseInterval > leaseRenewInterval, "invalid configuration: `db.postgres.lease.interval` must be greater than `db.postgres.lease.renew-interval`")
PgLock.LeaseLock(instanceId, leaseInterval, leaseRenewInterval, lockExceptionHandler)
case unknownLock => throw new RuntimeException(s"unknown postgres lock type: `$unknownLock`")
}
val lockType = LockType(dbConfig.getString("postgres.lock-type"))
val leaseInterval = dbConfig.getDuration("postgres.lease.interval").toSeconds.seconds

Databases.postgresJDBC(
database = database, host = host, port = port,
username = username, password = password,
poolProperties = properties,
val jdbcUrlFile = new File(dbdir, "last_jdbcurl")

Databases.PostgresDatabases(
hikariConfig = hikariConfig,
instanceId = instanceId,
databaseLeaseInterval = leaseInterval,
lockExceptionHandler = lockExceptionHandler, lockType = lockType, datadir = datadir
lock = lock,
jdbcUrlFile_opt = Some(jdbcUrlFile)
)
}

private def checkIfDatabaseUrlIsUnchanged(url: String, datadir: File ): Unit = {
val urlFile = new File(datadir, "last_jdbcurl")

def readString(path: Path): String = Files.readAllLines(path).get(0)

def writeString(path: Path, string: String): Unit = Files.write(path, java.util.Arrays.asList(string))

if (urlFile.exists()) {
val oldUrl = readString(urlFile.toPath)
if (oldUrl != url)
throw new RuntimeException(s"The database URL has changed since the last start. It was `$oldUrl`, now it's `$url`")
} else {
writeString(urlFile.toPath, url)
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ import fr.acinq.eclair.channel.HasCommitments
import fr.acinq.eclair.db.ChannelsDb
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.pg.PgUtils.DatabaseLock
import fr.acinq.eclair.db.pg.PgUtils.PgLock
import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.stateDataCodec
import grizzled.slf4j.Logging

import java.sql.Statement
import javax.sql.DataSource
import scala.collection.immutable.Queue

class PgChannelsDb(implicit ds: DataSource, lock: DatabaseLock) extends ChannelsDb with Logging {
class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb with Logging {

import PgUtils.ExtendedResultSet._
import PgUtils._
Expand Down
Loading