From a94b201709335414fb9d1b0478a97f00421d8fef Mon Sep 17 00:00:00 2001 From: Radim Marek Date: Tue, 26 Mar 2024 11:35:10 +0100 Subject: [PATCH] Limited sqlx.DB exposure (for the sake of the future pgx rewrite) --- consumer.go | 7 ++++++- example_consumer_test.go | 4 ++-- example_publisher_test.go | 4 ++-- examples_test.go | 4 ++-- integtest/consumer_test.go | 7 +++---- publisher.go | 7 ++++++- 6 files changed, 21 insertions(+), 12 deletions(-) diff --git a/consumer.go b/consumer.go index 05755fb..9229b85 100644 --- a/consumer.go +++ b/consumer.go @@ -251,7 +251,12 @@ func WithMetadataFilter(filter *MetadataFilter) ConsumerOption { } // NewConsumer creates Consumer with proper settings -func NewConsumer(db *sqlx.DB, queueName string, handler MessageHandler, opts ...ConsumerOption) (*Consumer, error) { +func NewConsumer(db *sql.DB, queueName string, handler MessageHandler, opts ...ConsumerOption) (*Consumer, error) { + return NewConsumerExt(sqlx.NewDb(db, "pgx"), queueName, handler, opts...) +} + +// NewConsumer creates Consumer with proper settings, using sqlx.DB (until refactored to use pgx directly) +func NewConsumerExt(db *sqlx.DB, queueName string, handler MessageHandler, opts ...ConsumerOption) (*Consumer, error) { config := defaultConsumerConfig for _, opt := range opts { opt(&config) diff --git a/example_consumer_test.go b/example_consumer_test.go index 8ce7047..095924a 100644 --- a/example_consumer_test.go +++ b/example_consumer_test.go @@ -2,6 +2,7 @@ package pgq_test import ( "context" + "database/sql" "encoding/json" "errors" "fmt" @@ -9,7 +10,6 @@ import ( "os" "os/signal" - "github.com/jmoiron/sqlx" "go.dataddo.com/pgq" ) @@ -48,7 +48,7 @@ func (h *Handler) HandleMessage(ctx context.Context, msg *pgq.MessageIncoming) ( } func ExampleConsumer() { - db, err := sqlx.Open("postgres", "user=postgres password=postgres host=localhost port=5432 dbname=postgres") + db, err := sql.Open("postgres", "user=postgres password=postgres host=localhost port=5432 dbname=postgres") if err != nil { log.Fatal("Error opening database:", err) } diff --git a/example_publisher_test.go b/example_publisher_test.go index 3e3f39f..32f6ac9 100644 --- a/example_publisher_test.go +++ b/example_publisher_test.go @@ -2,11 +2,11 @@ package pgq_test import ( "context" + "database/sql" "encoding/json" "log" "time" - "github.com/jmoiron/sqlx" "go.dataddo.com/pgq" ) @@ -15,7 +15,7 @@ type PayloadStruct struct { } func ExamplePublisher() { - db, err := sqlx.Open("postgres", "user=postgres password=postgres host=localhost port=5432 dbname=postgres") + db, err := sql.Open("postgres", "user=postgres password=postgres host=localhost port=5432 dbname=postgres") if err != nil { log.Fatal("Error opening database:", err) } diff --git a/examples_test.go b/examples_test.go index f16ec96..0c9dcca 100644 --- a/examples_test.go +++ b/examples_test.go @@ -2,16 +2,16 @@ package pgq_test import ( "context" + "database/sql" "log/slog" "os" "time" - "github.com/jmoiron/sqlx" "go.dataddo.com/pgq" "go.opentelemetry.io/otel/metric/noop" ) -var db *sqlx.DB +var db *sql.DB func ExampleNewConsumer() { slogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) diff --git a/integtest/consumer_test.go b/integtest/consumer_test.go index f1d6b1a..2a9622c 100644 --- a/integtest/consumer_test.go +++ b/integtest/consumer_test.go @@ -11,7 +11,6 @@ import ( "time" _ "github.com/jackc/pgx/v4/stdlib" - "github.com/jmoiron/sqlx" "go.opentelemetry.io/otel/metric/noop" . "go.dataddo.com/pgq" @@ -244,12 +243,12 @@ func TestConsumer_Run_MetadataFilter_NotEqual(t *testing.T) { } -func openDB(t *testing.T) *sqlx.DB { +func openDB(t *testing.T) *sql.DB { dsn, ok := os.LookupEnv("TEST_POSTGRES_DSN") if !ok { t.Skip("Skipping integration test, TEST_POSTGRES_DSN is not set") } - db, err := sqlx.Open("pgx", dsn) + db, err := sql.Open("pgx", dsn) require.NoError(t, err) t.Cleanup(func() { err := db.Close() @@ -259,7 +258,7 @@ func openDB(t *testing.T) *sqlx.DB { return db } -func ensureUUIDExtension(t *testing.T, db *sqlx.DB) { +func ensureUUIDExtension(t *testing.T, db *sql.DB) { _, err := db.Exec(` DO $$ BEGIN diff --git a/publisher.go b/publisher.go index 075d567..1e7f48b 100644 --- a/publisher.go +++ b/publisher.go @@ -51,7 +51,12 @@ func StaticMetaInjector(m Metadata) func(context.Context, Metadata) { } // NewPublisher initializes the publisher with given *sql.DB client. -func NewPublisher(db *sqlx.DB, opts ...PublisherOption) Publisher { +func NewPublisher(db *sql.DB, opts ...PublisherOption) Publisher { + return NewPublisherExt(sqlx.NewDb(db, "pgx"), opts...) +} + +// NewPublisher initializes the publisher with given *sqlx.DB client +func NewPublisherExt(db *sqlx.DB, opts ...PublisherOption) Publisher { cfg := publisherConfig{} for _, opt := range opts { opt(&cfg)