Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgresql support #1249

Merged
merged 77 commits into from
Jul 1, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
039525f
Postgresql support
rorp Nov 9, 2019
46cc062
tests
rorp Nov 11, 2019
fbdd7d2
cleanup
rorp Nov 11, 2019
86003a6
configurable PSQL connections
rorp Nov 11, 2019
3f05472
types
rorp Nov 11, 2019
0bdaf07
connectioin pooling
rorp Nov 12, 2019
9ff9e29
fix backups
rorp Nov 14, 2019
d586f6d
data migration tool
rorp Dec 9, 2019
a00e632
Merge branch 'master' into postgresql
rorp Dec 10, 2019
b6d1be6
Merge branch 'master' into postgresql
rorp Dec 14, 2019
b9c0f41
A naive implementation of lease based dtabase locking
rorp Dec 16, 2019
bd6f184
bug fixes
rorp Dec 17, 2019
92bc3ac
some more changes
rorp Dec 17, 2019
0e54f05
fix unit tests
rorp Dec 17, 2019
f1266d0
responded to the PR comments
rorp Dec 18, 2019
48c5209
cleanup
rorp Dec 18, 2019
a4ee99b
use EmbeddedPostgres for unit tests
rorp Jan 2, 2020
949f1ec
Merge branch 'master' into postgresql
rorp Jan 2, 2020
da7d008
optimistic locking
rorp Jan 14, 2020
84825ff
Merge branch 'master' into postgresql
rorp Jan 14, 2020
8c2bb44
some more changes
rorp Jan 14, 2020
dc731d2
some more changes
rorp Jan 22, 2020
cbeab4b
Merge branch 'master' into postgresql
rorp Jan 22, 2020
e047f99
more checks for the data migration tool
rorp Jan 27, 2020
a7a11ec
Merge branch 'master' into postgresql
rorp Jan 27, 2020
307a164
cleanup error messages
rorp Jan 27, 2020
f39db74
Merge branch 'master' into postgresql
rorp Feb 7, 2020
52810d3
Merge branch 'master' into postgresql
rorp Feb 7, 2020
4c6e9fd
Update .gitignore
rorp Feb 14, 2020
a2b3fd5
move sqlite2psql into a separate jar
rorp Feb 14, 2020
bac5144
add some safety features
rorp Feb 18, 2020
22dde40
Merge branch 'master' into postgresql
rorp Mar 22, 2020
a6c72e0
docs
rorp Mar 23, 2020
f116bf3
Merge branch 'master' into postgresql
rorp Mar 28, 2020
2ef5df8
minor changes
rorp Apr 7, 2020
f6a5f72
update dependencies
rorp Apr 14, 2020
c40af27
Merge branch 'postgresql' of github.com:rorp/eclair into postgresql
rorp Apr 14, 2020
21591f6
Merge branch 'master' into postgresql
rorp Apr 16, 2020
f1e5070
update docs
rorp Apr 19, 2020
8b06006
respond to the comments
rorp Apr 19, 2020
99b5a57
move instanceId to NodeParams
rorp Apr 19, 2020
1ae6ed1
cleanup SqliteToPgsql
rorp Apr 24, 2020
05ae49a
Merge branch 'master' into postgresql
rorp Apr 24, 2020
126e5b7
remove psql.ownership-lease.lock-timeout
rorp Apr 24, 2020
ae3c07f
CanBackup
rorp Apr 25, 2020
88d9bbf
removee optimistic locking
rorp Apr 25, 2020
dae1868
Update eclair-core/src/main/scala/fr/acinq/eclair/db/psql/PsqlAuditDb…
rorp Apr 25, 2020
3b26670
remove unnecessary migrations
rorp Apr 27, 2020
60bff77
remove sqlite2psql
rorp Apr 30, 2020
ab73a91
update reference.conf
rorp Apr 30, 2020
e35c818
random instance id
rorp Apr 30, 2020
4852115
docs
rorp Apr 30, 2020
2ca5896
Merge branch 'master' into postgresql
rorp May 1, 2020
af4bfbb
backups doc
rorp May 4, 2020
894f91f
Merge branch 'master' into postgresql
rorp May 12, 2020
b806840
typo
rorp May 12, 2020
81d3cf5
minimum pgsql version
rorp May 12, 2020
e9976c9
remove Platform.currentTime
rorp May 12, 2020
c261716
some more changes
rorp May 12, 2020
292a600
Merge branch 'master' into postgresql
rorp May 23, 2020
c80500c
rename psql to pg
rorp May 23, 2020
e13b8e6
config and code cleanup
rorp May 23, 2020
b071899
instance ID -> UUID
rorp May 23, 2020
6ec8e06
clean up JdbcUtil
rorp May 23, 2020
a34241a
responded to the comments
rorp May 23, 2020
7d02697
Merge branch 'master' into postgresql
rorp Jun 15, 2020
1306803
fix typos
rorp Jun 15, 2020
8174983
change default transaction isolation level
rorp Jun 15, 2020
666dae1
fix reference.conf
rorp Jun 16, 2020
c09e215
Merge branch 'master' into postgresql
rorp Jun 26, 2020
b1c0fcc
typo
rorp Jun 30, 2020
9f903eb
move instanceID
rorp Jun 30, 2020
7dd234b
move initDatabases() into Databases
rorp Jun 30, 2020
3c67704
cleanup tests
rorp Jun 30, 2020
14af48d
CanBackup -> FileBackup
rorp Jun 30, 2020
bd52978
refactor Databases
rorp Jun 30, 2020
878324a
Merge branch 'master' into postgresql
rorp Jun 30, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions eclair-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,16 @@
<artifactId>sqlite-jdbc</artifactId>
<version>3.27.2.1</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<version>42.2.8</version>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP-java7</artifactId>
<version>2.4.13</version>
</dependency>
<dependency>
<!-- This is to get rid of '[WARNING] warning: Class javax.annotation.Nonnull not found - continuing with a stub.' compile errors -->
<groupId>com.google.code.findbugs</groupId>
Expand Down
22 changes: 22 additions & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,28 @@ eclair {
port = 9051
private-key-file = "tor.dat"
}

db {
driver = "sqlite" // sqlite, psql
psql {
database = "eclair"
host = "localhost"
port = 5432
username = ""
password = ""
pool {
max-size = 10
connection-timeout = 30 seconds
idle-timeout = 10 minutes
max-life-time = 30 minutes
}
}
lock {
lease-interval = 5 minutes
lease-renew-interval = 1 minute
lock-timeout = 5 seconds
}
}
}

// do not edit or move this section
Expand Down
6 changes: 4 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package fr.acinq.eclair

import java.lang.management.ManagementFactory
import java.util.UUID

import akka.actor.ActorRef
Expand All @@ -41,7 +42,7 @@ import scodec.bits.ByteVector
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}

case class GetInfoResponse(nodeId: PublicKey, alias: String, chainHash: ByteVector32, blockHeight: Int, publicAddresses: Seq[NodeAddress])
case class GetInfoResponse(nodeId: PublicKey, alias: String, chainHash: ByteVector32, blockHeight: Int, publicAddresses: Seq[NodeAddress], instanceId: String)

case class AuditResponse(sent: Seq[PaymentSent], received: Seq[PaymentReceived], relayed: Seq[PaymentRelayed])

Expand Down Expand Up @@ -306,7 +307,8 @@ class EclairImpl(appKit: Kit) extends Eclair {
alias = appKit.nodeParams.alias,
chainHash = appKit.nodeParams.chainHash,
blockHeight = appKit.nodeParams.currentBlockHeight.toInt,
publicAddresses = appKit.nodeParams.publicAddresses)
publicAddresses = appKit.nodeParams.publicAddresses,
instanceId = ManagementFactory.getRuntimeMXBean().getName())
)

override def usableBalances()(implicit timeout: Timeout): Future[Iterable[UsableBalance]] =
Expand Down
62 changes: 46 additions & 16 deletions eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ import fr.acinq.eclair.channel.Register
import fr.acinq.eclair.crypto.LocalKeyManager
import fr.acinq.eclair.db.{BackupHandler, Databases}
import fr.acinq.eclair.io.{Authenticator, Server, Switchboard}
import fr.acinq.eclair.payment.receive.PaymentHandler
import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator}
import fr.acinq.eclair.payment.Auditor
import fr.acinq.eclair.payment.receive.PaymentHandler
import fr.acinq.eclair.payment.relay.{CommandBuffer, Relayer}
import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator}
import fr.acinq.eclair.router._
import fr.acinq.eclair.tor.TorProtocolHandler.OnionServiceVersion
import fr.acinq.eclair.tor.{Controller, TorProtocolHandler}
Expand Down Expand Up @@ -92,32 +92,30 @@ class Setup(datadir: File,
val chaindir = new File(datadir, chain)
val keyManager = new LocalKeyManager(seed, NodeParams.makeChainHash(chain))

val database = db match {
case Some(d) => d
case None => Databases.sqliteJDBC(chaindir)
}
val database = initDatabase(config.getConfig("db"))

/**
* This counter holds the current blockchain height.
* It is mainly used to calculate htlc expiries.
* The value is read by all actors, hence it needs to be thread-safe.
*/
* This counter holds the current blockchain height.
* It is mainly used to calculate htlc expiries.
* The value is read by all actors, hence it needs to be thread-safe.
*/
val blockCount = new AtomicLong(0)

/**
* This holds the current feerates, in satoshi-per-kilobytes.
* The value is read by all actors, hence it needs to be thread-safe.
*/
* This holds the current feerates, in satoshi-per-kilobytes.
* The value is read by all actors, hence it needs to be thread-safe.
*/
val feeratesPerKB = new AtomicReference[FeeratesPerKB](null)

/**
* This holds the current feerates, in satoshi-per-kw.
* The value is read by all actors, hence it needs to be thread-safe.
*/
* This holds the current feerates, in satoshi-per-kw.
* The value is read by all actors, hence it needs to be thread-safe.
*/
val feeratesPerKw = new AtomicReference[FeeratesPerKw](null)

val feeEstimator = new FeeEstimator {
override def getFeeratePerKb(target: Int): Long = feeratesPerKB.get().feePerBlock(target)

override def getFeeratePerKw(target: Int): Long = feeratesPerKw.get().feePerBlock(target)
}

Expand Down Expand Up @@ -349,12 +347,44 @@ class Setup(datadir: File,
None
}
}

private def initDatabase(dbConfig: Config): Databases = {
try {
val database = db match {
case Some(d) => d
case None =>
dbConfig.getString("driver") match {
case "sqlite" => Databases.sqliteJDBC(chaindir)
case "psql" => Databases.setupPsqlDatabases(dbConfig)
case _ => throw new RuntimeException(s"Unknown database driver `${dbConfig.getString("driver")}`")
}
}
val dbLockLeaseRenewInterval = dbConfig.getDuration("lock.lease-renew-interval").toSeconds.seconds
system.scheduler.schedule(dbLockLeaseRenewInterval, dbLockLeaseRenewInterval) {
try {
database.obtainExclusiveLock()
} catch {
case e: Throwable =>
logger.error("Cannot lock database. Exiting...", e)
sys.exit(-2)
}
}
database
} catch {
case e: Throwable =>
logger.error("Cannot initialize database. Exiting...", e)
sys.exit(-1)
}
}
}

// @formatter:off
sealed trait Bitcoin

case class Bitcoind(bitcoinClient: BasicBitcoinJsonRPCClient) extends Bitcoin

case class Electrum(electrumClient: ActorRef) extends Bitcoin

// @formatter:on

case class Kit(nodeParams: NodeParams,
Expand Down
36 changes: 17 additions & 19 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/BackupHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,28 +53,26 @@ class BackupHandler private(databases: Databases, backupFile: File, backupScript

def receive = {
case persisted: ChannelPersisted =>
val start = System.currentTimeMillis()
val tmpFile = new File(backupFile.getAbsolutePath.concat(".tmp"))
databases.backup(tmpFile)
// this will throw an exception if it fails, which is possible if the backup file is not on the same filesystem
// as the temporary file
Files.move(tmpFile.toPath, backupFile.toPath, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE)
val end = System.currentTimeMillis()
if (databases.isBackupSupported) {
val start = System.currentTimeMillis()
databases.backup(backupFile)
val end = System.currentTimeMillis()

// publish a notification that we have updated our backup
context.system.eventStream.publish(BackupCompleted)
// publish a notification that we have updated our backup
context.system.eventStream.publish(BackupCompleted)

log.info(s"database backup triggered by channelId=${persisted.channelId} took ${end - start}ms")
log.info(s"database backup triggered by channelId=${persisted.channelId} took ${end - start}ms")

backupScript_opt.foreach(backupScript => {
Try {
// run the script in the current thread and wait until it terminates
Process(backupScript).!
} match {
case Success(exitCode) => log.info(s"backup notify script $backupScript returned $exitCode")
case Failure(cause) => log.warning(s"cannot start backup notify script $backupScript: $cause")
}
})
backupScript_opt.foreach(backupScript => {
Try {
// run the script in the current thread and wait until it terminates
Process(backupScript).!
} match {
case Success(exitCode) => log.info(s"backup notify script $backupScript returned $exitCode")
case Failure(cause) => log.warning(s"cannot start backup notify script $backupScript: $cause")
}
})
}
}
}

Expand Down
107 changes: 100 additions & 7 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@
package fr.acinq.eclair.db

import java.io.File
import java.lang.management.ManagementFactory
import java.nio.file.{Files, StandardCopyOption}
import java.sql.{Connection, DriverManager}
import java.util.concurrent.TimeUnit

import com.typesafe.config.Config
import fr.acinq.eclair.db.psql._
import fr.acinq.eclair.db.sqlite._
import grizzled.slf4j.Logging
import org.sqlite.SQLiteException
import javax.sql.DataSource

import scala.concurrent.duration._

trait Databases {

Expand All @@ -38,6 +45,10 @@ trait Databases {
val pendingRelay: PendingRelayDb

def backup(file: File): Unit

val isBackupSupported: Boolean

def obtainExclusiveLock(): Unit
}

object Databases extends Logging {
Expand All @@ -57,9 +68,8 @@ object Databases extends Logging {
sqliteEclair = DriverManager.getConnection(s"jdbc:sqlite:${new File(dbdir, "eclair.sqlite")}")
sqliteNetwork = DriverManager.getConnection(s"jdbc:sqlite:${new File(dbdir, "network.sqlite")}")
sqliteAudit = DriverManager.getConnection(s"jdbc:sqlite:${new File(dbdir, "audit.sqlite")}")
SqliteUtils.obtainExclusiveLock(sqliteEclair) // there should only be one process writing to this file
logger.info("successful lock on eclair.sqlite")
databaseByConnections(sqliteAudit, sqliteNetwork, sqliteEclair)
sqliteDatabaseByConnections(sqliteAudit, sqliteNetwork, sqliteEclair)
} catch {
case t: Throwable => {
logger.error("could not create connection to sqlite databases: ", t)
Expand All @@ -69,23 +79,106 @@ object Databases extends Logging {
throw t
}
}
}

def postgresJDBC(database: String = "eclair", host: String = "localhost", port: Int = 5432,
username: Option[String] = None, password: Option[String] = None,
poolProperties: Map[String, Long] = Map(),
instanceId: String = ManagementFactory.getRuntimeMXBean().getName(),
lockLeaseInterval: FiniteDuration = 5.minutes,
lockTimeout: FiniteDuration = 5.seconds): Databases = {
try {
val url = s"jdbc:postgresql://${host}:${port}/${database}"

import com.zaxxer.hikari.{HikariConfig, HikariDataSource}

val config = new HikariConfig()
config.setJdbcUrl(url)
username.foreach(config.setUsername)
password.foreach(config.setPassword)
poolProperties.get("max-size").foreach(x => config.setMaximumPoolSize(x.toInt))
poolProperties.get("connection-timeout").foreach(config.setConnectionTimeout)
poolProperties.get("idle-timeout").foreach(config.setIdleTimeout)
poolProperties.get("max-life-time").foreach(config.setMaxLifetime)

implicit val ds: DataSource = new HikariDataSource(config)

PsqlUtils.obtainExclusiveLock(instanceId, lockLeaseInterval, lockTimeout)

new Databases {
override val network = new PsqlNetworkDb()
override val audit = new PsqlAuditDb()
override val channels = new PsqlChannelsDb()
override val peers = new PsqlPeersDb()
override val payments = new PsqlPaymentsDb()
override val pendingRelay = new PsqlPendingRelayDb()
override def backup(file: File): Unit = throw new RuntimeException("psql driver does not support channels backup")
override val isBackupSupported: Boolean = false
override def obtainExclusiveLock(): Unit = PsqlUtils.obtainExclusiveLock(instanceId, lockLeaseInterval, lockTimeout)
}
} catch {
case t: Throwable => {
logger.error("could not create connection to psql database: ", t)
throw t
}
}
}

def databaseByConnections(auditJdbc: Connection, networkJdbc: Connection, eclairJdbc: Connection) = new Databases {
def sqliteDatabaseByConnections(auditJdbc: Connection, networkJdbc: Connection, eclairJdbc: Connection): Databases = new Databases {
override val network = new SqliteNetworkDb(networkJdbc)
override val audit = new SqliteAuditDb(auditJdbc)
override val channels = new SqliteChannelsDb(eclairJdbc)
override val peers = new SqlitePeersDb(eclairJdbc)
override val payments = new SqlitePaymentsDb(eclairJdbc)
override val pendingRelay = new SqlitePendingRelayDb(eclairJdbc)
override def backup(backupFile: File): Unit = {
val tmpFile = new File(backupFile.getAbsolutePath.concat(".tmp"))

override def backup(file: File): Unit = {
SqliteUtils.using(eclairJdbc.createStatement()) {
statement => {
statement.executeUpdate(s"backup to ${file.getAbsolutePath}")
statement => {
statement.executeUpdate(s"backup to ${tmpFile.getAbsolutePath}")
}
}

// this will throw an exception if it fails, which is possible if the backup file is not on the same filesystem
// as the temporary file
Files.move(tmpFile.toPath, backupFile.toPath, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE)
}
override val isBackupSupported: Boolean = true

override def obtainExclusiveLock(): Unit = ()
}

def setupPsqlDatabases(dbConfig: Config): Databases = {
val database = dbConfig.getString("psql.database")
val host = dbConfig.getString("psql.host")
val port = dbConfig.getInt("psql.port")
val username = if (dbConfig.getIsNull("psql.username") || dbConfig.getString("psql.username").isBlank)
None
else
Some(dbConfig.getString("psql.username"))
val password = if (dbConfig.getIsNull("psql.password") || dbConfig.getString("psql.password").isBlank)
None
else
Some(dbConfig.getString("psql.password"))
val properties = {
val poolConfig = dbConfig.getConfig("psql.pool")
Map.empty
.updated("max-size", poolConfig.getInt("max-size").toLong)
.updated("connection-timeout", poolConfig.getDuration("connection-timeout").toMillis)
.updated("idle-timeout", poolConfig.getDuration("idle-timeout").toMillis)
.updated("max-life-time", poolConfig.getDuration("max-life-time").toMillis)

}
val lockLeaseInterval = dbConfig.getDuration("lock.lease-interval").toSeconds.seconds
val lockTimeout = dbConfig.getDuration("lock.lock-timeout").toSeconds.seconds

Databases.postgresJDBC(
database = database, host = host, port = port,
username = username, password = password,
poolProperties = properties,
lockLeaseInterval = lockLeaseInterval, lockTimeout = lockTimeout
)
}

}
Loading