Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): read from multiple tables #129

Merged
merged 32 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
c6bd8a5
feat: read from multiple tables
raulb Apr 8, 2024
ae3d4b7
use sdk constant
raulb Apr 8, 2024
ce8af71
feat(longpoll): use opencdc.collection
raulb Apr 9, 2024
648415d
read from all tables
raulb Apr 9, 2024
fec84ff
update paramgen.go
raulb Apr 9, 2024
33a1bce
update readme
raulb Apr 9, 2024
e60e467
restore readme
raulb Apr 10, 2024
72aacd4
read columns from multiple tables
raulb Apr 10, 2024
c27ee2b
fix typo
raulb Apr 10, 2024
ad94965
restore
raulb Apr 16, 2024
ffc6b98
leftovers from #131
raulb Apr 16, 2024
69d71db
more leftovers
raulb Apr 16, 2024
a31e299
update readme on destination
raulb Apr 16, 2024
79215e4
update
raulb Apr 16, 2024
ed06502
update readme
raulb Apr 16, 2024
0fb3033
update comment
raulb Apr 16, 2024
8562ffb
update comment
raulb Apr 17, 2024
6dd20ff
use opencdc constant
raulb Apr 17, 2024
157a556
start making changes on readme
raulb Apr 17, 2024
a89db39
address after rebase
raulb Apr 19, 2024
84c5322
wip
raulb Apr 23, 2024
b694c8c
wip
raulb Apr 23, 2024
8db0932
tie snapshot iterator into longpolling mode
lovromazgon Apr 24, 2024
cd0badd
Merge branch 'main' into raul/read-multiple-tables
lovromazgon Apr 24, 2024
109b80d
get table key if not supplied manually
lovromazgon Apr 24, 2024
de7f2d3
ensure snapshot iterator waits for all acks
lovromazgon Apr 24, 2024
0c580d7
fix snapshot iterator tests
lovromazgon Apr 24, 2024
417ae97
add test for ensuring iterator waits for acks
lovromazgon Apr 24, 2024
9d91190
fix cdc tests
lovromazgon Apr 24, 2024
8fc7331
undo changes on readme
raulb Apr 24, 2024
c7fc5b5
address PR review
lovromazgon Apr 24, 2024
eebc0ab
use pg_tables
lovromazgon Apr 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 14 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,31 +52,21 @@ can't be determined it will fail.

## Configuration Options

| name | description | required | default |
|---------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------|
| `url` | Connection string for the Postgres database. | true | |
| `table` | List of table names to read from, separated by comma. Example: `"employees,offices,payments"` | true | |
| `key` | List of Key column names per table, separated by comma. Example:`"table1:key1,table2:key2"`, if not supplied, the table primary key will be used as the `'Key'` field for the records. | false | |
| `snapshotMode` | Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`). | false | `initial` |
| `cdcMode` | Determines the CDC mode (allowed values: `auto`, `logrepl` or `long_polling`). | false | `auto` |
| `logrepl.publicationName` | Name of the publication to listen for WAL events. | false | `conduitpub` |
| `logrepl.slotName` | Name of the slot opened for replication events. | false | `conduitslot` |
| name | description | required | default |
|---------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------|
| `url` | Connection string for the Postgres database. | true | |
| `table` | List of table names to read from, separated by comma. Using `*` will read from all public tables. | true | |
| `key` | List of Key column names per table, separated by comma. Example:`"table1:key1,table2:key2"`, if not supplied, the table(s) primary keys will be used as the `'Key'` field for the records. | false | |
raulb marked this conversation as resolved.
Show resolved Hide resolved
| `snapshotMode` | Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`). | false | `initial` |
| `cdcMode` | Determines the CDC mode (allowed values: `auto`, `logrepl` or `long_polling`). | false | `auto` |
| `logrepl.publicationName` | Name of the publication to listen for WAL events. | false | `conduitpub` |
| `logrepl.slotName` | Name of the slot opened for replication events. | false | `conduitslot` |

# Destination

The Postgres Destination takes a `record.Record` and parses it into a valid SQL query. The Destination is designed to
handle different payloads and keys. Because of this, each record is individually parsed and upserted.

## Table Name

If a record contains a `postgres.table` property in its metadata it will be inserted in that table, otherwise it will
fall back to use the table configured in the connector. This way the Destination can support multiple tables in the same
connector.

This is especially important in a pipeline where the source is also a Postgres connector, as the source will include the
`postgres.table` field in the metadata of each record. If you want to reroute the records to a different table, you have
to modify the `postgres.table` field in the record's metadata using a processor.

## Upsert Behavior

If the target table already contains a record with the same key, the Destination will upsert with its current received
Expand All @@ -87,14 +77,11 @@ If there is no key, the record will be simply appended.

## Configuration Options

| name | description | required | default |
|---------|-----------------------------------------------------------------------------|----------|---------|
| `url` | Connection string for the Postgres database. | true | |
| `table` | The name of the table in Postgres that the connector should write to.* | false | |
| `key` | Column name used to detect if the target table already contains the record. | false | |

*Note that the `postgres.table` field in the record's metadata will override the `table` property in the destination's
configuration. Please refer to [Table Name](#table-name) for more information.
| name | description | required | default |
|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|--------------------------------------------|
| `url` | Connection string for the Postgres database. | true | |
| `table` | Table name. It can contain a Go template that will be executed for each record to determine the table. By default, the table is the value of the `opencdc.collection` metadata field. | false | `{{ index .Metadata "opencdc.collection" }}` |
| `key` | Column name used to detect if the target table already contains the record. | false | |

# Testing

Expand Down
6 changes: 0 additions & 6 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@ import (
"github.com/jackc/pgx/v5"
)

const (
// TODO same constant is defined in packages longpoll, logrepl and destination
// use same constant everywhere
MetadataOpenCDCCollection = "opencdc.collection"
)

type Destination struct {
sdk.UnimplementedDestination

Expand Down
10 changes: 5 additions & 5 deletions destination_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestDestination_Write(t *testing.T) {
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationSnapshot,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Metadata: map[string]string{sdk.MetadataCollection: tableName},
Key: sdk.StructuredData{"id": 5000},
Payload: sdk.Change{
After: sdk.StructuredData{
Expand All @@ -71,7 +71,7 @@ func TestDestination_Write(t *testing.T) {
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationCreate,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Metadata: map[string]string{sdk.MetadataCollection: tableName},
Key: sdk.StructuredData{"id": 5},
Payload: sdk.Change{
After: sdk.StructuredData{
Expand All @@ -86,7 +86,7 @@ func TestDestination_Write(t *testing.T) {
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationUpdate,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Metadata: map[string]string{sdk.MetadataCollection: tableName},
Key: sdk.StructuredData{"id": 6},
Payload: sdk.Change{
After: sdk.StructuredData{
Expand All @@ -101,7 +101,7 @@ func TestDestination_Write(t *testing.T) {
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationUpdate,
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Metadata: map[string]string{sdk.MetadataCollection: tableName},
Key: sdk.StructuredData{"id": 1},
Payload: sdk.Change{
After: sdk.StructuredData{
Expand All @@ -115,7 +115,7 @@ func TestDestination_Write(t *testing.T) {
name: "delete",
record: sdk.Record{
Position: sdk.Position("foo"),
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Metadata: map[string]string{sdk.MetadataCollection: tableName},
Operation: sdk.OperationDelete,
Key: sdk.StructuredData{"id": 4},
},
Expand Down
25 changes: 12 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ go 1.22
require (
github.com/Masterminds/sprig/v3 v3.2.3
github.com/Masterminds/squirrel v1.5.4
github.com/conduitio/conduit-connector-sdk v0.8.0
github.com/conduitio/conduit-commons v0.1.2-0.20240405195636-cb5e072472b0
github.com/conduitio/conduit-connector-sdk v0.8.1-0.20240408123504-cec49fc57887
github.com/daixiang0/gci v0.13.4
github.com/golangci/golangci-lint v1.57.2
github.com/google/go-cmp v0.6.0
Expand Down Expand Up @@ -53,7 +54,7 @@ require (
github.com/charithe/durationcheck v0.0.10 // indirect
github.com/chavacava/garif v0.1.0 // indirect
github.com/ckaznocha/intrange v0.1.1 // indirect
github.com/conduitio/conduit-connector-protocol v0.5.0 // indirect
github.com/conduitio/conduit-connector-protocol v0.5.1-0.20240408121719-ffe7a46af296 // indirect
github.com/curioswitch/go-reassign v0.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/denis-tingaikin/go-header v0.5.0 // indirect
Expand All @@ -77,7 +78,7 @@ require (
github.com/gobwas/glob v0.2.3 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golangci/dupl v0.0.0-20180902072040-3e9179ac440a // indirect
github.com/golangci/gofmt v0.0.0-20231018234816-f50ced29576e // indirect
github.com/golangci/misspell v0.4.1 // indirect
Expand All @@ -89,8 +90,8 @@ require (
github.com/gostaticanalysis/comment v1.4.2 // indirect
github.com/gostaticanalysis/forcetypeassert v0.1.0 // indirect
github.com/gostaticanalysis/nilerr v0.1.1 // indirect
github.com/hashicorp/go-hclog v1.3.1 // indirect
github.com/hashicorp/go-plugin v1.4.5 // indirect
github.com/hashicorp/go-hclog v1.5.0 // indirect
github.com/hashicorp/go-plugin v1.6.0 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
Expand All @@ -103,7 +104,6 @@ require (
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jgautheron/goconst v1.7.1 // indirect
github.com/jhump/protoreflect v1.10.2-0.20220118162304-602a8db873e3 // indirect
github.com/jingyugao/rowserrcheck v1.1.1 // indirect
github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect
github.com/jjti/go-spancheck v0.5.3 // indirect
Expand Down Expand Up @@ -155,7 +155,7 @@ require (
github.com/quasilyte/gogrep v0.5.0 // indirect
github.com/quasilyte/regex/syntax v0.0.0-20210819130434-b3f0c404a727 // indirect
github.com/quasilyte/stdinfo v0.0.0-20220114132959-f7386bf02567 // indirect
github.com/rs/zerolog v1.31.0 // indirect
github.com/rs/zerolog v1.32.0 // indirect
github.com/ryancurrah/gomodguard v1.3.1 // indirect
github.com/ryanrolds/sqlclosecheck v0.5.1 // indirect
github.com/sanposhiho/wastedassign/v2 v2.0.7 // indirect
Expand All @@ -172,7 +172,7 @@ require (
github.com/sourcegraph/go-diff v0.7.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/cobra v1.7.0 // indirect
github.com/spf13/cobra v1.8.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.12.0 // indirect
Expand All @@ -198,22 +198,21 @@ require (
gitlab.com/bosi/decorder v0.4.1 // indirect
go-simpler.org/musttag v0.9.0 // indirect
go-simpler.org/sloglint v0.5.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/automaxprocs v1.5.3 // indirect
go.uber.org/goleak v1.3.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc // indirect
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect
golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240325203815-454cdb8f5daa // indirect
google.golang.org/grpc v1.63.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
Loading