Skip to content

Commit

Permalink
Enable batching in destination (#104)
Browse files Browse the repository at this point in the history
* enable batching in destination

* update sdk to include batching middleware

* add unit test

* update docs for handler methods
  • Loading branch information
lovromazgon committed Jul 31, 2023
1 parent 84613e7 commit cfcc81f
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 41 deletions.
91 changes: 53 additions & 38 deletions destination/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,34 @@ func (d *Destination) Open(ctx context.Context) error {
// Write routes incoming records to their appropriate handler based on the
// operation.
func (d *Destination) Write(ctx context.Context, recs []sdk.Record) (int, error) {
for i, r := range recs {
// TODO send all queries to postgres in one round-trip
err := sdk.Util.Destination.Route(ctx, r,
d.handleInsert,
d.handleUpdate,
d.handleDelete,
d.handleInsert,
)
b := &pgx.Batch{}
for _, rec := range recs {
var err error
switch rec.Operation {
case sdk.OperationCreate:
err = d.handleInsert(rec, b)
case sdk.OperationUpdate:
err = d.handleUpdate(rec, b)
case sdk.OperationDelete:
err = d.handleDelete(rec, b)
case sdk.OperationSnapshot:
err = d.handleInsert(rec, b)
default:
return 0, fmt.Errorf("invalid operation %q", rec.Operation)
}
if err != nil {
return i, err
return 0, err
}
}

br := d.conn.SendBatch(ctx, b)
defer br.Close()

for i := range recs {
_, err := br.Exec()
if err != nil {
// the batch is executed in a transaction, if one failed all failed
return 0, fmt.Errorf("failed to execute query for record %d: %w", i, err)
}
}
return len(recs), nil
Expand All @@ -121,33 +139,37 @@ func (d *Destination) Teardown(ctx context.Context) error {
return nil
}

// handleInsert checks for the existence of a key. If no key is present it will
// plainly insert the data. If a key exists, but no key column name is
// configured, it attempts a plain insert to that database.
func (d *Destination) handleInsert(ctx context.Context, r sdk.Record) error {
// handleInsert adds a query to the batch that stores the record in the target
// table. It checks for the existence of a key. If no key is present or a key
// exists and no key column name is configured, it will plainly insert the data.
// Otherwise it upserts the record.
func (d *Destination) handleInsert(r sdk.Record, b *pgx.Batch) error {
if !d.hasKey(r) || d.config.keyColumnName == "" {
return d.insert(ctx, r)
return d.insert(r, b)
}
return d.upsert(ctx, r)
return d.upsert(r, b)
}

// handleUpdate assumes the record has a key and will fail if one is not present
func (d *Destination) handleUpdate(ctx context.Context, r sdk.Record) error {
// handleUpdate adds a query to the batch that updates the record in the target
// table. It assumes the record has a key and fails if one is not present.
func (d *Destination) handleUpdate(r sdk.Record, b *pgx.Batch) error {
if !d.hasKey(r) {
return fmt.Errorf("key must be provided on update actions")
}
// TODO handle case if the key was updated
return d.upsert(ctx, r)
return d.upsert(r, b)
}

func (d *Destination) handleDelete(ctx context.Context, r sdk.Record) error {
// handleDelete adds a query to the batch that deletes the record from the
// target table. It assumes the record has a key and fails if one is not present.
func (d *Destination) handleDelete(r sdk.Record, b *pgx.Batch) error {
if !d.hasKey(r) {
return fmt.Errorf("key must be provided on delete actions")
}
return d.remove(ctx, r)
return d.remove(r, b)
}

func (d *Destination) upsert(ctx context.Context, r sdk.Record) error {
func (d *Destination) upsert(r sdk.Record, b *pgx.Batch) error {
payload, err := d.getPayload(r)
if err != nil {
return fmt.Errorf("failed to get payload: %w", err)
Expand All @@ -170,15 +192,11 @@ func (d *Destination) upsert(ctx context.Context, r sdk.Record) error {
return fmt.Errorf("error formatting query: %w", err)
}

_, err = d.conn.Exec(ctx, query, args...)
if err != nil {
return fmt.Errorf("insert exec failed: %w", err)
}

b.Queue(query, args...)
return nil
}

func (d *Destination) remove(ctx context.Context, r sdk.Record) error {
func (d *Destination) remove(r sdk.Record, b *pgx.Batch) error {
key, err := d.getKey(r)
if err != nil {
return err
Expand All @@ -195,14 +213,15 @@ func (d *Destination) remove(ctx context.Context, r sdk.Record) error {
if err != nil {
return fmt.Errorf("error formatting delete query: %w", err)
}
_, err = d.conn.Exec(ctx, query, args...)
return err

b.Queue(query, args...)
return nil
}

// insert is an append-only operation that doesn't care about keys, but
// can error on constraints violations so should only be used when no table
// key or unique constraints are otherwise present.
func (d *Destination) insert(ctx context.Context, r sdk.Record) error {
func (d *Destination) insert(r sdk.Record, b *pgx.Batch) error {
tableName, err := d.getTableName(r.Metadata)
if err != nil {
return err
Expand All @@ -224,8 +243,9 @@ func (d *Destination) insert(ctx context.Context, r sdk.Record) error {
if err != nil {
return fmt.Errorf("error formatting insert query: %w", err)
}
_, err = d.conn.Exec(ctx, query, args...)
return err

b.Queue(query, args...)
return nil
}

func (d *Destination) getPayload(r sdk.Record) (sdk.StructuredData, error) {
Expand Down Expand Up @@ -294,17 +314,12 @@ func (d *Destination) formatUpsertQuery(

colArgs, valArgs := formatColumnsAndValues(key, payload)

query, args, err := d.stmtBuilder.
return d.stmtBuilder.
Insert(tableName).
Columns(colArgs...).
Values(valArgs...).
SuffixExpr(sq.Expr(upsertQuery)).
ToSql()
if err != nil {
return "", nil, fmt.Errorf("error formatting query: %w", err)
}

return query, args, nil
}

// formatColumnsAndValues turns the key and payload into a slice of ordered
Expand Down
62 changes: 62 additions & 0 deletions destination/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,68 @@ func TestDestination_Write(t *testing.T) {
}
}

func TestDestination_Batch(t *testing.T) {
is := is.New(t)
ctx := context.Background()
conn := test.ConnectSimple(ctx, t, test.RegularConnString)
tableName := test.SetupTestTable(ctx, t, conn)

d := NewDestination()
err := d.Configure(ctx, map[string]string{"url": test.RegularConnString, "table": tableName})
is.NoErr(err)
err = d.Open(ctx)
is.NoErr(err)
defer func() {
err := d.Teardown(ctx)
is.NoErr(err)
}()

records := []sdk.Record{{
Position: sdk.Position("foo1"),
Operation: sdk.OperationCreate,
Key: sdk.StructuredData{"id": 5},
Payload: sdk.Change{
After: sdk.StructuredData{
"column1": "foo1",
"column2": 1,
"column3": false,
},
},
}, {
Position: sdk.Position("foo2"),
Operation: sdk.OperationCreate,
Key: sdk.StructuredData{"id": 6},
Payload: sdk.Change{
After: sdk.StructuredData{
"column1": "foo2",
"column2": 2,
"column3": true,
},
},
}, {
Position: sdk.Position("foo3"),
Operation: sdk.OperationCreate,
Key: sdk.StructuredData{"id": 7},
Payload: sdk.Change{
After: sdk.StructuredData{
"column1": "foo3",
"column2": 3,
"column3": false,
},
},
}}

i, err := d.Write(ctx, records)
is.NoErr(err)
is.Equal(i, len(records))

for _, rec := range records {
got, err := queryTestTable(ctx, conn, tableName, rec.Key.(sdk.StructuredData)["id"])
is.NoErr(err)
is.Equal(rec.Payload.After, got)
}
}

func queryTestTable(ctx context.Context, conn test.Querier, tableName string, id any) (sdk.StructuredData, error) {
row := conn.QueryRow(
ctx,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

require (
github.com/Masterminds/squirrel v1.5.4
github.com/conduitio/conduit-connector-sdk v0.7.2
github.com/conduitio/conduit-connector-sdk v0.7.2-0.20230726125302-4374355412f2
github.com/jackc/pgconn v1.14.1
github.com/jackc/pglogrepl v0.0.0-20220305000529-420b8467887a
github.com/jackc/pgproto3/v2 v2.3.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/conduitio/conduit-connector-protocol v0.5.0 h1:Rr2SsDAvWDryQArvonwPoXBELQA2wRXr49xBLrAtBaM=
github.com/conduitio/conduit-connector-protocol v0.5.0/go.mod h1:UIhHWxq52hvwwbkvQDaRgZRHfbpDDmU7tZaw0mwLdd4=
github.com/conduitio/conduit-connector-sdk v0.7.2 h1:M1IDbtAfWict3PaSi5edclhW4VuRPkkFsL5hRihlmCk=
github.com/conduitio/conduit-connector-sdk v0.7.2/go.mod h1:OakeXjAsPyHotUBRXXDlrV+/v/1zMkoEgozPNiNJo9Q=
github.com/conduitio/conduit-connector-sdk v0.7.2-0.20230726125302-4374355412f2 h1:hnvdLQ8/mlZrthkgtaqqAsYmSXlQYM1SLDPQAr4dAVE=
github.com/conduitio/conduit-connector-sdk v0.7.2-0.20230726125302-4374355412f2/go.mod h1:OakeXjAsPyHotUBRXXDlrV+/v/1zMkoEgozPNiNJo9Q=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
Expand Down

0 comments on commit cfcc81f

Please sign in to comment.