diff --git a/cmd/main.go b/cmd/main.go index a8b31a2..a0493f6 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -8,7 +8,7 @@ import ( "github.com/inngest/dbcap/pkg/changeset" "github.com/inngest/dbcap/pkg/eventwriter" - "github.com/inngest/dbcap/pkg/replicator/pg" + "github.com/inngest/dbcap/pkg/replicator/pgreplicator" "github.com/jackc/pgx/v5" ) @@ -26,7 +26,7 @@ func main() { panic(err) } - r, err := pg.Postgres(ctx, pg.PostgresOpts{ + r, err := pgreplicator.New(ctx, pgreplicator.Opts{ Config: *config, }) if err != nil { diff --git a/internal/test/pg_init.go b/internal/test/pg_init.go index 2c359fd..0afec73 100644 --- a/internal/test/pg_init.go +++ b/internal/test/pg_init.go @@ -6,7 +6,7 @@ import ( "strings" "testing" - "github.com/inngest/dbcap/pkg/replicator/pg/pgsetup" + "github.com/inngest/dbcap/pkg/replicator/pgreplicator/pgsetup" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/stretchr/testify/require" diff --git a/pkg/replicator/pg/pg.go b/pkg/replicator/pgreplicator/pg.go similarity index 98% rename from pkg/replicator/pg/pg.go rename to pkg/replicator/pgreplicator/pg.go index 87b2bca..b68ee98 100644 --- a/pkg/replicator/pg/pg.go +++ b/pkg/replicator/pgreplicator/pg.go @@ -1,4 +1,4 @@ -package pg +package pgreplicator import ( "context" @@ -54,7 +54,7 @@ type PostgresReplicator interface { Close(ctx context.Context) error } -type PostgresOpts struct { +type Opts struct { Config pgx.ConnConfig // WatermarkSaver saves the current watermark to local storage. This should be paired with a // WatermarkLoader to load offsets when the replicator restarts. @@ -68,8 +68,8 @@ type PostgresOpts struct { Log *slog.Logger } -// Postgres returns a new postgres replicator for a single postgres database. -func Postgres(ctx context.Context, opts PostgresOpts) (PostgresReplicator, error) { +// New returns a new postgres replicator for a single postgres database. +func New(ctx context.Context, opts Opts) (PostgresReplicator, error) { cfg := opts.Config // Ensure that we add "replication": "database" as a to the replication @@ -114,7 +114,7 @@ func Postgres(ctx context.Context, opts PostgresOpts) (PostgresReplicator, error type pg struct { // opts stores the initialization opts, including watermark functs - opts PostgresOpts + opts Opts // conn is the WAL streaming connection. Once replication starts, this // conn cannot be used for any queries. conn *pgx.Conn diff --git a/pkg/replicator/pg/pg_test.go b/pkg/replicator/pgreplicator/pg_test.go similarity index 95% rename from pkg/replicator/pg/pg_test.go rename to pkg/replicator/pgreplicator/pg_test.go index a1994c7..06a8fc6 100644 --- a/pkg/replicator/pg/pg_test.go +++ b/pkg/replicator/pgreplicator/pg_test.go @@ -1,4 +1,4 @@ -package pg +package pgreplicator import ( "context" @@ -32,7 +32,7 @@ func TestReplicationSlot(t *testing.T) { DisableCreateSlot: true, }) - r, err := Postgres(ctx, PostgresOpts{Config: cfg}) + r, err := New(ctx, Opts{Config: cfg}) require.NoError(t, err) _, err = r.ReplicationSlot(ctx) @@ -50,7 +50,7 @@ func TestReplicationSlot(t *testing.T) { DisableCreateSlot: true, }) - r, err := Postgres(ctx, PostgresOpts{Config: cfg}) + r, err := New(ctx, Opts{Config: cfg}) require.NoError(t, err) _, err = r.ReplicationSlot(ctx) @@ -78,7 +78,7 @@ func TestCommit(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) c, cfg := test.StartPG(t, ctx, test.StartPGOpts{Version: v}) - r, err := Postgres(ctx, PostgresOpts{Config: cfg}) + r, err := New(ctx, Opts{Config: cfg}) require.NoError(t, err) // Set up event writer which listens to changes @@ -134,8 +134,8 @@ func TestInsert(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) c, conn := test.StartPG(t, ctx, test.StartPGOpts{Version: v}) - opts := PostgresOpts{Config: conn} - r, err := Postgres(ctx, opts) + opts := Opts{Config: conn} + r, err := New(ctx, opts) require.NoError(t, err) var ( @@ -223,7 +223,7 @@ func TestUpdateMany_ReplicaIdentityFull(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) c, connCfg := test.StartPG(t, ctx, test.StartPGOpts{Version: v}) - opts := PostgresOpts{Config: connCfg} + opts := Opts{Config: connCfg} // // Insert accounts before starting replication watching. This lets us @@ -234,7 +234,7 @@ func TestUpdateMany_ReplicaIdentityFull(t *testing.T) { Interval: 1 * time.Millisecond, }) - r, err := Postgres(ctx, opts) + r, err := New(ctx, opts) require.NoError(t, err) var ( @@ -374,7 +374,7 @@ func TestUpdateMany_DisableReplicaIdentityFull(t *testing.T) { Version: v, DisableReplicaIdentityFull: true, }) - opts := PostgresOpts{Config: connCfg} + opts := Opts{Config: connCfg} // // Insert accounts before starting replication watching. This lets us @@ -385,7 +385,7 @@ func TestUpdateMany_DisableReplicaIdentityFull(t *testing.T) { Interval: 1 * time.Millisecond, }) - r, err := Postgres(ctx, opts) + r, err := New(ctx, opts) require.NoError(t, err) var ( @@ -492,8 +492,8 @@ func TestConnectingWithoutLogicalReplicationFails(t *testing.T) { DisableCreateSlot: true, }) - opts := PostgresOpts{Config: conn} - r, err := Postgres(ctx, opts) + opts := Opts{Config: conn} + r, err := New(ctx, opts) require.NoError(t, err) err = r.Pull(ctx, nil) @@ -513,8 +513,8 @@ func TestConnectingWithoutReplicationSlotFails(t *testing.T) { DisableCreateSlot: true, }) - opts := PostgresOpts{Config: conn} - r, err := Postgres(ctx, opts) + opts := Opts{Config: conn} + r, err := New(ctx, opts) require.NoError(t, err) err = r.Pull(ctx, nil) @@ -534,8 +534,8 @@ func TestMultipleConectionsFail(t *testing.T) { }) // The first time we connect things should succeed. - opts := PostgresOpts{Config: conn} - r1, err := Postgres(ctx, opts) + opts := Opts{Config: conn} + r1, err := New(ctx, opts) require.NoError(t, err) wg := sync.WaitGroup{} @@ -549,7 +549,7 @@ func TestMultipleConectionsFail(t *testing.T) { <-time.After(50 * time.Millisecond) - r2, err := Postgres(ctx, opts) + r2, err := New(ctx, opts) require.NoError(t, err) err = r2.Pull(ctx, nil) diff --git a/pkg/replicator/pg/pgsetup/pgsetup.go b/pkg/replicator/pgreplicator/pgsetup/pgsetup.go similarity index 100% rename from pkg/replicator/pg/pgsetup/pgsetup.go rename to pkg/replicator/pgreplicator/pgsetup/pgsetup.go diff --git a/pkg/replicator/pg/txn_unwrapper.go b/pkg/replicator/pgreplicator/txn_unwrapper.go similarity index 98% rename from pkg/replicator/pg/txn_unwrapper.go rename to pkg/replicator/pgreplicator/txn_unwrapper.go index 229ffe1..60062f5 100644 --- a/pkg/replicator/pg/txn_unwrapper.go +++ b/pkg/replicator/pgreplicator/txn_unwrapper.go @@ -1,4 +1,4 @@ -package pg +package pgreplicator import ( "sync/atomic" diff --git a/pkg/replicator/pg/txn_unwrapper_test.go b/pkg/replicator/pgreplicator/txn_unwrapper_test.go similarity index 99% rename from pkg/replicator/pg/txn_unwrapper_test.go rename to pkg/replicator/pgreplicator/txn_unwrapper_test.go index 247cc22..6728f79 100644 --- a/pkg/replicator/pg/txn_unwrapper_test.go +++ b/pkg/replicator/pgreplicator/txn_unwrapper_test.go @@ -1,4 +1,4 @@ -package pg +package pgreplicator import ( "testing"