Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix operation for snapshots and replace use of information_schema #154

Merged
merged 1 commit into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/golangci/golangci-lint v1.58.2
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9
github.com/jackc/pgx/v5 v5.5.5
github.com/matryer/is v1.4.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0=
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds=
github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE=
github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8=
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9 h1:86CQbMauoZdLS0HDLcEHYo6rErjiCBjVvcxGsioIn7s=
Expand Down
10 changes: 3 additions & 7 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,9 @@ func (s *Source) getAllTables(ctx context.Context) ([]string, error) {
// getPrimaryKey queries the db for the name of the primary key column for a
// table if one exists and returns it.
func (s *Source) getPrimaryKey(ctx context.Context, tableName string) (string, error) {
query := `SELECT c.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.constraint_column_usage AS ccu USING (constraint_schema, constraint_name)
JOIN information_schema.columns AS c ON c.table_schema = tc.constraint_schema
AND tc.table_name = c.table_name AND ccu.column_name = c.column_name
WHERE constraint_type = 'PRIMARY KEY' AND tc.table_schema = 'public'
AND tc.table_name = $1`
query := `SELECT a.attname FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = $1::regclass AND i.indisprimary`

rows, err := s.pool.Query(ctx, query, tableName)
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions source/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,4 @@ type Iterator interface {
Teardown(context.Context) error
}

var (
_ Iterator = (*logrepl.CDCIterator)(nil)
)
var _ Iterator = (*logrepl.CDCIterator)(nil)
5 changes: 2 additions & 3 deletions source/logrepl/internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ package internal
import (
"errors"

"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgconn"
)

const pgDuplicateObjectErrorCode = "42710"

func IsPgDuplicateErr(err error) bool {
var pgerr *pgconn.PgError
return errors.As(err, &pgerr) && pgerr.Code == pgDuplicateObjectErrorCode
return errors.As(err, &pgerr) && pgerr.Code == pgerrcode.DuplicateObject
}
13 changes: 7 additions & 6 deletions source/snapshot/fetch_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,9 @@ func (*FetchWorker) validateKey(ctx context.Context, table, key string, tx pgx.T

if err := tx.QueryRow(
ctx,
"SELECT data_type FROM information_schema.columns WHERE table_name=$1 AND column_name=$2",
table, key,
`SELECT a.atttypid::regtype AS type FROM pg_class c JOIN pg_attribute a ON c.oid = a.attrelid
WHERE c.relkind = 'r' AND a.attname = $1 AND c.relname = $2`,
key, table,
).Scan(&dataType); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("key %q not present on table %q", key, table)
Expand All @@ -379,12 +380,12 @@ func (*FetchWorker) validateKey(ctx context.Context, table, key string, tx pgx.T

var isPK bool

// As per https://wiki.postgresql.org/wiki/Retrieve_primary_key_columns
if err := tx.QueryRow(
ctx,
`SELECT EXISTS(SELECT tc.constraint_type
FROM information_schema.constraint_column_usage cu JOIN information_schema.table_constraints tc
ON tc.constraint_name = cu.constraint_name
WHERE cu.table_name=$1 AND cu.column_name=$2)`,
`SELECT EXISTS(SELECT a.attname FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = $1::regclass AND a.attname = $2 AND i.indisprimary)`,
table, key,
).Scan(&isPK); err != nil {
return fmt.Errorf("unable to determine key %q constraints: %w", key, err)
Expand Down
9 changes: 5 additions & 4 deletions source/snapshot/fetch_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,11 @@ func Test_FetcherValidate(t *testing.T) {

err := f.Validate(ctx)
is.True(err != nil)
is.True(strings.Contains(
err.Error(),
fmt.Sprintf(`key "missing_key" not present on table %q`, table),
))
ok := strings.Contains(err.Error(), fmt.Sprintf(`key "missing_key" not present on table %q`, table))
if !ok {
t.Logf("error: %s", err.Error())
}
is.True(ok)
})
}

Expand Down
2 changes: 1 addition & 1 deletion source/snapshot/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (i *Iterator) buildRecord(d FetchData) sdk.Record {
metadata := make(sdk.Metadata)
metadata["postgres.table"] = d.Table

return sdk.Util.Source.NewRecordCreate(pos, metadata, d.Key, d.Payload)
return sdk.Util.Source.NewRecordSnapshot(pos, metadata, d.Key, d.Payload)
}

func (i *Iterator) initFetchers(ctx context.Context) error {
Expand Down
5 changes: 4 additions & 1 deletion source/snapshot/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/conduitio/conduit-connector-postgres/source/position"
"github.com/conduitio/conduit-connector-postgres/test"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/matryer/is"
)

Expand All @@ -48,9 +49,11 @@ func Test_Iterator_Next(t *testing.T) {
}()

for j := 1; j <= 4; j++ {
_, err = i.Next(ctx)
r, err := i.Next(ctx)
is.NoErr(err)
is.Equal(r.Operation, sdk.OperationSnapshot)
}

for j := 1; j <= 4; j++ {
err = i.Ack(ctx, nil)
is.NoErr(err)
Expand Down