-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kvdb: add postgres #5366
kvdb: add postgres #5366
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
package postgres | ||
|
||
import "time" | ||
|
||
// Config holds postgres configuration data. | ||
type Config struct { | ||
Dsn string `long:"dsn" description:"Database connection string."` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Data Source Name, I thought it was a generic thing in SQL land, but maybe not. |
||
Timeout time.Duration `long:"timeout" description:"Database connection timeout. Set to zero to disable."` | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,241 @@ | ||
// +build kvdb_postgres | ||
|
||
package postgres | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is anything in this file actually postgres specific? Would replacing the database conn string w/ MYSQL just work out of the box? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No it wouldn't because some of the SQL is db engine specific. Maybe it could be generalized, but this could have a performance impact. Also some special operations like listing tables don't have a generic SQL statement for it afaik |
||
|
||
import ( | ||
"context" | ||
"database/sql" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"sync" | ||
"time" | ||
|
||
"github.com/btcsuite/btcwallet/walletdb" | ||
_ "github.com/jackc/pgx/v4/stdlib" | ||
) | ||
|
||
const ( | ||
// kvTableName is the name of the table that will contain all the kv | ||
// pairs. | ||
kvTableName = "kv" | ||
) | ||
|
||
// KV stores a key/value pair. | ||
type KV struct { | ||
key string | ||
val string | ||
} | ||
|
||
// db holds a reference to the postgres connection connection. | ||
type db struct { | ||
// cfg is the postgres connection config. | ||
cfg *Config | ||
|
||
// prefix is the table name prefix that is used to simulate namespaces. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this be a top-level argument in the main config? As it would allow multiple There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the gain of having multiple lnds write to the same db in diff namespaces? I thought it is safer to keep them isolated. For a cluster of nodes that operate as one, namespace isolation is probably not necessary? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It would make it possible to run multiple There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, but there is also the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense, had initially missed that! |
||
// We don't use schemas because at least sqlite does not support that. | ||
prefix string | ||
|
||
// ctx is the overall context for the database driver. | ||
// | ||
// TODO: This is an anti-pattern that is in place until the kvdb | ||
// interface supports a context. | ||
ctx context.Context | ||
|
||
// db is the underlying database connection instance. | ||
db *sql.DB | ||
|
||
// lock is the global write lock that ensures single writer. | ||
lock sync.RWMutex | ||
|
||
// table is the name of the table that contains the data for all | ||
// top-level buckets that have keys that cannot be mapped to a distinct | ||
// sql table. | ||
table string | ||
} | ||
|
||
// Enforce db implements the walletdb.DB interface. | ||
var _ walletdb.DB = (*db)(nil) | ||
|
||
// newPostgresBackend returns a db object initialized with the passed backend | ||
// config. If postgres connection cannot be estabished, then returns error. | ||
func newPostgresBackend(ctx context.Context, config *Config, prefix string) ( | ||
*db, error) { | ||
|
||
if prefix == "" { | ||
return nil, errors.New("empty postgres prefix") | ||
} | ||
|
||
dbConn, err := sql.Open("pgx", config.Dsn) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Related to comment above about things being postgres specific or not. Can we just elevate this to a config level value (the driver name)? Would be cool to be able to support everything listed here "out of the box" (traditional asterisk applies ofc lol): https://github.com/golang/go/wiki/SQLDrivers#drivers There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes so adding a new backend isn't that simple. Also locking models are different. Not a problem in this single writer pr, but when switching to serializable the details of the engine do start to matter. |
||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// Compose system table names. | ||
table := fmt.Sprintf( | ||
"%s_%s", prefix, kvTableName, | ||
) | ||
|
||
// Execute the create statements to set up a kv table in postgres. Every | ||
// row points to the bucket that it is one via its parent_id field. A | ||
// NULL parent_id means that the key belongs to the upper-most bucket in | ||
// this table. A constraint on parent_id is enforcing referential | ||
// integrity. | ||
// | ||
// Furthermore there is a <table>_p index on parent_id that is required | ||
// for the foreign key constraint. | ||
// | ||
// Finally there are unique indices on (parent_id, key) to prevent the | ||
// same key being present in a bucket more than once (<table>_up and | ||
// <table>_unp). In postgres, a single index wouldn't enforce the unique | ||
// constraint on rows with a NULL parent_id. Therefore two indices are | ||
// defined. | ||
_, err = dbConn.ExecContext(ctx, ` | ||
CREATE SCHEMA IF NOT EXISTS public; | ||
CREATE TABLE IF NOT EXISTS public.`+table+` | ||
( | ||
key bytea NOT NULL, | ||
value bytea, | ||
parent_id bigint, | ||
id bigserial PRIMARY KEY, | ||
sequence bigint, | ||
CONSTRAINT `+table+`_parent FOREIGN KEY (parent_id) | ||
REFERENCES public.`+table+` (id) | ||
ON UPDATE NO ACTION | ||
ON DELETE CASCADE | ||
); | ||
|
||
CREATE INDEX IF NOT EXISTS `+table+`_p | ||
ON public.`+table+` (parent_id); | ||
|
||
CREATE UNIQUE INDEX IF NOT EXISTS `+table+`_up | ||
ON public.`+table+` | ||
(parent_id, key) WHERE parent_id IS NOT NULL; | ||
|
||
CREATE UNIQUE INDEX IF NOT EXISTS `+table+`_unp | ||
ON public.`+table+` (key) WHERE parent_id IS NULL; | ||
`) | ||
if err != nil { | ||
_ = dbConn.Close() | ||
|
||
return nil, err | ||
} | ||
|
||
backend := &db{ | ||
cfg: config, | ||
prefix: prefix, | ||
ctx: ctx, | ||
db: dbConn, | ||
table: table, | ||
} | ||
|
||
return backend, nil | ||
} | ||
|
||
// getTimeoutCtx gets a timeout context for database requests. | ||
func (db *db) getTimeoutCtx() (context.Context, func()) { | ||
if db.cfg.Timeout == time.Duration(0) { | ||
return db.ctx, func() {} | ||
} | ||
|
||
return context.WithTimeout(db.ctx, db.cfg.Timeout) | ||
} | ||
|
||
// getPrefixedTableName returns a table name for this prefix (namespace). | ||
func (db *db) getPrefixedTableName(table string) string { | ||
return fmt.Sprintf("%s_%s", db.prefix, table) | ||
} | ||
|
||
// catchPanic executes the specified function. If a panic occurs, it is returned | ||
// as an error value. | ||
func catchPanic(f func() error) (err error) { | ||
defer func() { | ||
if r := recover(); r != nil { | ||
err = r.(error) | ||
log.Criticalf("Caught unhandled error: %v", err) | ||
} | ||
}() | ||
|
||
err = f() | ||
|
||
return | ||
} | ||
|
||
// View opens a database read transaction and executes the function f with the | ||
// transaction passed as a parameter. After f exits, the transaction is rolled | ||
// back. If f errors, its error is returned, not a rollback error (if any | ||
// occur). The passed reset function is called before the start of the | ||
// transaction and can be used to reset intermediate state. As callers may | ||
// expect retries of the f closure (depending on the database backend used), the | ||
// reset function will be called before each retry respectively. | ||
func (db *db) View(f func(tx walletdb.ReadTx) error, reset func()) error { | ||
return db.executeTransaction( | ||
func(tx walletdb.ReadWriteTx) error { | ||
return f(tx.(walletdb.ReadTx)) | ||
}, | ||
reset, true, | ||
) | ||
} | ||
|
||
// Update opens a database read/write transaction and executes the function f | ||
// with the transaction passed as a parameter. After f exits, if f did not | ||
// error, the transaction is committed. Otherwise, if f did error, the | ||
// transaction is rolled back. If the rollback fails, the original error | ||
// returned by f is still returned. If the commit fails, the commit error is | ||
// returned. As callers may expect retries of the f closure, the reset function | ||
// will be called before each retry respectively. | ||
func (db *db) Update(f func(tx walletdb.ReadWriteTx) error, reset func()) (err error) { | ||
return db.executeTransaction(f, reset, false) | ||
} | ||
|
||
// executeTransaction creates a new read-only or read-write transaction and | ||
// executes the given function within it. | ||
func (db *db) executeTransaction(f func(tx walletdb.ReadWriteTx) error, | ||
reset func(), readOnly bool) error { | ||
|
||
reset() | ||
|
||
tx, err := newReadWriteTx(db, readOnly) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = catchPanic(func() error { return f(tx) }) | ||
if err != nil { | ||
if rollbackErr := tx.Rollback(); rollbackErr != nil { | ||
log.Errorf("Error rolling back tx: %v", rollbackErr) | ||
} | ||
|
||
return err | ||
} | ||
|
||
return tx.Commit() | ||
} | ||
|
||
// PrintStats returns all collected stats pretty printed into a string. | ||
func (db *db) PrintStats() string { | ||
return "stats not supported by Postgres driver" | ||
} | ||
|
||
// BeginReadWriteTx opens a database read+write transaction. | ||
func (db *db) BeginReadWriteTx() (walletdb.ReadWriteTx, error) { | ||
return newReadWriteTx(db, false) | ||
} | ||
|
||
// BeginReadTx opens a database read transaction. | ||
func (db *db) BeginReadTx() (walletdb.ReadTx, error) { | ||
return newReadWriteTx(db, true) | ||
} | ||
|
||
// Copy writes a copy of the database to the provided writer. This call will | ||
// start a read-only transaction to perform all operations. | ||
// This function is part of the walletdb.Db interface implementation. | ||
func (db *db) Copy(w io.Writer) error { | ||
return errors.New("not implemented") | ||
} | ||
|
||
// Close cleanly shuts down the database and syncs all data. | ||
// This function is part of the walletdb.Db interface implementation. | ||
func (db *db) Close() error { | ||
return db.db.Close() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
// +build kvdb_postgres | ||
|
||
package postgres | ||
joostjager marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/btcsuite/btcwallet/walletdb" | ||
"github.com/btcsuite/btcwallet/walletdb/walletdbtest" | ||
"github.com/stretchr/testify/require" | ||
"golang.org/x/net/context" | ||
) | ||
|
||
// TestInterface performs all interfaces tests for this database driver. | ||
func TestInterface(t *testing.T) { | ||
f := NewFixture(t) | ||
defer f.Cleanup() | ||
|
||
// dbType is the database type name for this driver. | ||
const dbType = "postgres" | ||
|
||
ctx := context.Background() | ||
cfg := &Config{ | ||
Dsn: testDsn, | ||
} | ||
|
||
walletdbtest.TestInterface(t, dbType, ctx, cfg, prefix) | ||
} | ||
|
||
// TestPanic tests recovery from panic conditions. | ||
func TestPanic(t *testing.T) { | ||
f := NewFixture(t) | ||
defer f.Cleanup() | ||
|
||
d := f.NewBackend() | ||
|
||
err := d.(*db).Update(func(tx walletdb.ReadWriteTx) error { | ||
bucket, err := tx.CreateTopLevelBucket([]byte("test")) | ||
require.NoError(t, err) | ||
|
||
// Stop database server. | ||
f.Cleanup() | ||
|
||
// Keep trying to get data until Get panics because the | ||
// connection is lost. | ||
for i := 0; i < 50; i++ { | ||
Roasbeef marked this conversation as resolved.
Show resolved
Hide resolved
|
||
bucket.Get([]byte("key")) | ||
time.Sleep(100 * time.Millisecond) | ||
} | ||
|
||
return nil | ||
}, func() {}) | ||
|
||
require.Contains(t, err.Error(), "terminating connection") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you want separate lines for these tags to test them individually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more context: when we add the tag
kvdb_etcd
we also run channeldb unit tests on etcd.The trick is that there's a
GetTestBackend()
function inkvdb/backend.go
and there's a constant behind the tag inkvdb/kvdb_etcd.go
andkvdb/kvdb_no_etcd.go
that defines which backend to start withGetTestBacked()
.Looking at it again I now think having the tags passed together works but it'll still run the channeldb tests on etcd. You may want to fix this such that those tests also run on Postgres.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran the channeldb tests on postgres and they all pass except for three specialty tests. Two fail because of assumptions about the db path (does not apply to postgres) and one because of panic bubbling that is different in Postgres. Needs a closer look.
My proposal is to do that work in a separate PR where we can also evaluate the additional run time for a test suite that is already long-running. For now, I at least have manual confirmation that the channeldb tests do pass on Postgres. Let me know your thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Running the postgres tests as a separate suite sounds good to me. If that needs some more work because of special cases I agree it should be done in a separate PR. Knowing they pass (with just three explainable exceptions) is important though.
@joostjager can you create an issue please so we don't forget?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've pushed the code that I used here: #5550. I think this can double as an issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it the case that tacking on this extra tag doesn't actually do anything in practice? I'm guessing Go has some ordering tie-breaker here that causes it to only use the first tag?
In any case I think we want these to run separately so we can catch distinct regressions for both backends.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these tests still failing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are running separately. There is
kvdb/etcd_test.go
andkvdb/postgres_test.go
. Both build tags are set to have all code available in the test binary.Still failing, I haven't continued the work on that PR #5550 above.