Skip to content

Commit

Permalink
New snapshot iterator and position format (#132)
Browse files Browse the repository at this point in the history
* New snapshot iterator and position format

The iterator supports snapshots based on range and is limited to tables
which have an ordinal number (integer) as primary key. If these conditions are
not met, validation will fail.

Support for multiple tables is builtin. Each table gets a dedicated worker.
Positional data keeps track of the progress of all tables, not just the one which the record
is related to.

Resuming a snapshot is supported, if the snapshot positional data requires it.

This work is not integrated into the CDC or LongPoll iterator and the new position type
is not compatible. This will work will follow.

* remove done channel

* position test; add header and fix linter

* fmt

* tests; adjust pg config for testing

* bump go

* add stringer

* pass db pool as param to iterator

* attach methods to receiver; rename fetcherworker to fetchworker; remove usage of sprint

* add validation for key not being numeric

* cr changes

* remove fmt, one type of params is enough

* lint

* simplifications

* include context in tomb

* return snapshot position explicitly in fetcher

* fix linter warning

* set correct position type

* make test assertion more stable

* document LSN

* rename snapshot to snapshots

---------

Co-authored-by: Lovro Mažgon <lovro.mazgon@gmail.com>
  • Loading branch information
lyuboxa and lovromazgon committed Apr 18, 2024
1 parent e7c451c commit 8062fb6
Show file tree
Hide file tree
Showing 19 changed files with 1,493 additions and 111 deletions.
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,22 @@ test:
# run required docker containers, execute integration tests, stop containers after tests
docker compose -f test/docker-compose.yml up --quiet-pull -d --wait
go test $(GOTEST_FLAGS) -race ./...; ret=$$?; \
docker compose -f test/docker-compose.yml down; \
docker compose -f test/docker-compose.yml down --volumes; \
exit $$ret

.PHONY: lint
lint:
golangci-lint run
golangci-lint run -v

.PHONY: generate
generate:
go generate ./...

.PHONY: fmt
fmt:
gofumpt -l -w .
gci write --skip-generated .

.PHONY: install-tools
install-tools:
@echo Installing tools from tools.go
Expand Down
121 changes: 61 additions & 60 deletions destination_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,75 +50,76 @@ func TestDestination_Write(t *testing.T) {
tests := []struct {
name string
record sdk.Record
}{{
name: "snapshot",
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationSnapshot,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Key: sdk.StructuredData{"id": 5000},
Payload: sdk.Change{
After: sdk.StructuredData{
"column1": "foo",
"column2": 123,
"column3": true,
}{
{
name: "snapshot",
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationSnapshot,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Key: sdk.StructuredData{"id": 5000},
Payload: sdk.Change{
After: sdk.StructuredData{
"column1": "foo",
"column2": 123,
"column3": true,
},
},
},
},
}, {
name: "create",
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationCreate,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Key: sdk.StructuredData{"id": 5},
Payload: sdk.Change{
After: sdk.StructuredData{
"column1": "foo",
"column2": 456,
"column3": false,
}, {
name: "create",
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationCreate,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Key: sdk.StructuredData{"id": 5},
Payload: sdk.Change{
After: sdk.StructuredData{
"column1": "foo",
"column2": 456,
"column3": false,
},
},
},
},
}, {
name: "insert on update (upsert)",
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationUpdate,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Key: sdk.StructuredData{"id": 6},
Payload: sdk.Change{
After: sdk.StructuredData{
"column1": "bar",
"column2": 567,
"column3": true,
}, {
name: "insert on update (upsert)",
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationUpdate,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Key: sdk.StructuredData{"id": 6},
Payload: sdk.Change{
After: sdk.StructuredData{
"column1": "bar",
"column2": 567,
"column3": true,
},
},
},
},
}, {
name: "update on conflict",
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationUpdate,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Key: sdk.StructuredData{"id": 1},
Payload: sdk.Change{
After: sdk.StructuredData{
"column1": "foobar",
"column2": 567,
"column3": true,
}, {
name: "update on conflict",
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationUpdate,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Key: sdk.StructuredData{"id": 1},
Payload: sdk.Change{
After: sdk.StructuredData{
"column1": "foobar",
"column2": 567,
"column3": true,
},
},
},
}, {
name: "delete",
record: sdk.Record{
Position: sdk.Position("foo"),
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Operation: sdk.OperationDelete,
Key: sdk.StructuredData{"id": 4},
},
},
}, {
name: "delete",
record: sdk.Record{
Position: sdk.Position("foo"),
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Operation: sdk.OperationDelete,
Key: sdk.StructuredData{"id": 4},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
module github.com/conduitio/conduit-connector-postgres

go 1.21
go 1.22

require (
github.com/Masterminds/sprig/v3 v3.2.3
github.com/Masterminds/squirrel v1.5.4
github.com/conduitio/conduit-connector-sdk v0.8.0
github.com/daixiang0/gci v0.12.3
github.com/golangci/golangci-lint v1.57.2
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9
github.com/jackc/pgx/v5 v5.5.5
github.com/matryer/is v1.4.1
golang.org/x/tools v0.19.0
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
mvdan.cc/gofumpt v0.6.0
)

require (
Expand Down Expand Up @@ -50,7 +55,6 @@ require (
github.com/ckaznocha/intrange v0.1.1 // indirect
github.com/conduitio/conduit-connector-protocol v0.5.0 // indirect
github.com/curioswitch/go-reassign v0.2.0 // indirect
github.com/daixiang0/gci v0.12.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/denis-tingaikin/go-header v0.5.0 // indirect
github.com/ettle/strcase v0.2.0 // indirect
Expand Down Expand Up @@ -80,7 +84,6 @@ require (
github.com/golangci/plugin-module-register v0.1.1 // indirect
github.com/golangci/revgrep v0.5.2 // indirect
github.com/golangci/unconvert v0.0.0-20240309020433-c5143eacb3ed // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gordonklaus/ineffassign v0.1.0 // indirect
github.com/gostaticanalysis/analysisutil v0.7.1 // indirect
github.com/gostaticanalysis/comment v1.4.2 // indirect
Expand Down Expand Up @@ -209,15 +212,12 @@ require (
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.19.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
honnef.co/go/tools v0.4.7 // indirect
mvdan.cc/gofumpt v0.6.0 // indirect
mvdan.cc/unparam v0.0.0-20240104100049-c549a3470d14 // indirect
)
1 change: 1 addition & 0 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (s *Source) Configure(_ context.Context, cfg map[string]string) error {
}
return nil
}

func (s *Source) Open(ctx context.Context, pos sdk.Position) error {
conn, err := pgx.Connect(ctx, s.config.URL)
if err != nil {
Expand Down
73 changes: 37 additions & 36 deletions source/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,43 +25,44 @@ func TestConfig_Validate(t *testing.T) {
name string
cfg Config
wantErr bool
}{{
name: "valid config",
cfg: Config{
URL: "postgresql://meroxauser:meroxapass@127.0.0.1:5432/meroxadb",
Table: []string{"table1", "table2"},
Key: []string{"table1:key1"},
CDCMode: CDCModeLogrepl,
}{
{
name: "valid config",
cfg: Config{
URL: "postgresql://meroxauser:meroxapass@127.0.0.1:5432/meroxadb",
Table: []string{"table1", "table2"},
Key: []string{"table1:key1"},
CDCMode: CDCModeLogrepl,
},
wantErr: false,
}, {
name: "invalid postgres url",
cfg: Config{
URL: "postgresql",
Table: []string{"table1", "table2"},
Key: []string{"table1:key1"},
CDCMode: CDCModeLogrepl,
},
wantErr: true,
}, {
name: "invalid multiple tables for long polling",
cfg: Config{
URL: "postgresql://meroxauser:meroxapass@127.0.0.1:5432/meroxadb",
Table: []string{"table1", "table2"},
Key: []string{"table1:key1"},
CDCMode: CDCModeLongPolling,
},
wantErr: true,
}, {
name: "invalid key list format",
cfg: Config{
URL: "postgresql://meroxauser:meroxapass@127.0.0.1:5432/meroxadb",
Table: []string{"table1", "table2"},
Key: []string{"key1,key2"},
CDCMode: CDCModeLogrepl,
},
wantErr: true,
},
wantErr: false,
}, {
name: "invalid postgres url",
cfg: Config{
URL: "postgresql",
Table: []string{"table1", "table2"},
Key: []string{"table1:key1"},
CDCMode: CDCModeLogrepl,
},
wantErr: true,
}, {
name: "invalid multiple tables for long polling",
cfg: Config{
URL: "postgresql://meroxauser:meroxapass@127.0.0.1:5432/meroxadb",
Table: []string{"table1", "table2"},
Key: []string{"table1:key1"},
CDCMode: CDCModeLongPolling,
},
wantErr: true,
}, {
name: "invalid key list format",
cfg: Config{
URL: "postgresql://meroxauser:meroxapass@127.0.0.1:5432/meroxadb",
Table: []string{"table1", "table2"},
Key: []string{"key1,key2"},
CDCMode: CDCModeLogrepl,
},
wantErr: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion source/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/conduitio/conduit-connector-postgres/source/logrepl"
"github.com/conduitio/conduit-connector-postgres/source/longpoll"

sdk "github.com/conduitio/conduit-connector-sdk"
)

Expand Down
3 changes: 2 additions & 1 deletion source/logrepl/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ func createTestSnapshot(ctx context.Context, t *testing.T, pool *pgxpool.Pool) s

// creates a snapshot iterator for testing that hands its connection's cleanup.
func createTestSnapshotIterator(ctx context.Context, t *testing.T,
pool *pgxpool.Pool, cfg SnapshotConfig) *SnapshotIterator {
pool *pgxpool.Pool, cfg SnapshotConfig,
) *SnapshotIterator {
is := is.New(t)

conn, err := pool.Acquire(ctx)
Expand Down
Loading

0 comments on commit 8062fb6

Please sign in to comment.