Skip to content

Commit

Permalink
Include before data in payload for updates and deletes; Fix avro rel …
Browse files Browse the repository at this point in the history
…schema in deletes (#177)

* Include before data in payload for updates and deletes; Fix avro rel schema in deletes

When `REPLICA IDENTITY` is set to `FULL`, old values are available to be included in the payload
for both `DELETE` and `UPDATE` operations and should be included.

Ensure nil is handled for UUID formatting.

* fix uuid func comment
  • Loading branch information
lyuboxa authored Jun 27, 2024
1 parent 5ecd48e commit d491b85
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 23 deletions.
126 changes: 109 additions & 17 deletions source/logrepl/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,20 @@ func TestCDCIterator_Next(t *testing.T) {
<-i.sub.Ready()

tests := []struct {
name string
setupQuery string
want sdk.Record
wantErr bool
name string
setup func(t *testing.T)
want sdk.Record
wantErr bool
}{
{
name: "should detect insert",
setupQuery: `INSERT INTO %s (id, column1, column2, column3, column4, column5)
VALUES (6, 'bizz', 456, false, 12.3, 14)`,
setup: func(t *testing.T) {
is := is.New(t)
query := fmt.Sprintf(`INSERT INTO %s (id, column1, column2, column3, column4, column5)
VALUES (6, 'bizz', 456, false, 12.3, 14)`, table)
_, err := pool.Exec(ctx, query)
is.NoErr(err)
},
wantErr: false,
want: sdk.Record{
Operation: sdk.OperationCreate,
Expand All @@ -160,9 +165,12 @@ func TestCDCIterator_Next(t *testing.T) {
},
{
name: "should detect update",
setupQuery: `UPDATE %s
SET column1 = 'test cdc updates'
WHERE key = '1'`,
setup: func(t *testing.T) {
is := is.New(t)
query := fmt.Sprintf(`UPDATE %s SET column1 = 'test cdc updates' WHERE key = '1'`, table)
_, err := pool.Exec(ctx, query)
is.NoErr(err)
},
wantErr: false,
want: sdk.Record{
Operation: sdk.OperationUpdate,
Expand All @@ -171,7 +179,6 @@ func TestCDCIterator_Next(t *testing.T) {
},
Key: sdk.StructuredData{"id": int64(1)},
Payload: sdk.Change{
Before: nil, // TODO
After: sdk.StructuredData{
"id": int64(1),
"column1": "test cdc updates",
Expand All @@ -185,27 +192,112 @@ func TestCDCIterator_Next(t *testing.T) {
},
},
{
name: "should detect delete",
setupQuery: `DELETE FROM %s WHERE id = 3`,
wantErr: false,
name: "should detect full update",
setup: func(t *testing.T) {
is := is.New(t)
_, err := pool.Exec(ctx, fmt.Sprintf("ALTER TABLE %s REPLICA IDENTITY FULL", table))
is.NoErr(err)
query := fmt.Sprintf(`UPDATE %s SET column1 = 'test cdc full updates' WHERE key = '1'`, table)
_, err = pool.Exec(ctx, query)
is.NoErr(err)
},
wantErr: false,
want: sdk.Record{
Operation: sdk.OperationUpdate,
Metadata: map[string]string{
sdk.MetadataCollection: table,
},
Key: sdk.StructuredData{"id": int64(1)},
Payload: sdk.Change{
Before: sdk.StructuredData{
"id": int64(1),
"column1": "test cdc updates",
"column2": int32(123),
"column3": false,
"column4": 12.2,
"column5": int64(4),
"key": []uint8("1"),
},
After: sdk.StructuredData{
"id": int64(1),
"column1": "test cdc full updates",
"column2": int32(123),
"column3": false,
"column4": 12.2,
"column5": int64(4),
"key": []uint8("1"),
},
},
},
},
{
name: "should detect delete",
setup: func(t *testing.T) {
is := is.New(t)
_, err := pool.Exec(ctx, fmt.Sprintf("ALTER TABLE %s REPLICA IDENTITY DEFAULT", table))
is.NoErr(err)
query := fmt.Sprintf(`DELETE FROM %s WHERE id = 4`, table)
_, err = pool.Exec(ctx, query)
is.NoErr(err)
},
wantErr: false,
want: sdk.Record{
Operation: sdk.OperationDelete,
Metadata: map[string]string{
sdk.MetadataCollection: table,
},
Key: sdk.StructuredData{"id": int64(4)},
Payload: sdk.Change{
Before: sdk.StructuredData{
"id": int64(4),
"column1": nil,
"column2": nil,
"column3": nil,
"column4": nil,
"column5": nil,
"key": nil,
},
},
},
},
{
name: "should detect full delete",
setup: func(t *testing.T) {
is := is.New(t)
_, err := pool.Exec(ctx, fmt.Sprintf("ALTER TABLE %s REPLICA IDENTITY FULL", table))
is.NoErr(err)
query := fmt.Sprintf(`DELETE FROM %s WHERE id = 3`, table)
_, err = pool.Exec(ctx, query)
is.NoErr(err)
},
wantErr: false,
want: sdk.Record{
Operation: sdk.OperationDelete,
Metadata: map[string]string{
sdk.MetadataCollection: table,
},
Key: sdk.StructuredData{"id": int64(3)},
Payload: sdk.Change{
Before: sdk.StructuredData{
"id": int64(3),
"key": []uint8("3"),
"column1": "baz",
"column2": int32(789),
"column3": false,
"column4": nil,
"column5": int64(9),
},
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
is := is.New(t)
now := time.Now()

// execute change
query := fmt.Sprintf(tt.setupQuery, table)
_, err := pool.Exec(ctx, query)
is.NoErr(err)
tt.setup(t)

// fetch the change
nextCtx, cancel := context.WithTimeout(ctx, time.Second*10)
Expand Down
10 changes: 10 additions & 0 deletions source/logrepl/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,21 @@ func (h *CDCHandler) handleDelete(
return fmt.Errorf("failed to decode old values: %w", err)
}

if err := h.updateAvroSchema(rel, msg.OldTuple); err != nil {
return fmt.Errorf("failed to update avro schema: %w", err)
}

rec := sdk.Util.Source.NewRecordDelete(
h.buildPosition(lsn),
h.buildRecordMetadata(rel),
h.buildRecordKey(oldValues, rel.RelationName),
)

rec.Payload = sdk.Change{
Before: h.buildRecordPayload(oldValues),
After: nil,
}

return h.send(ctx, rec)
}

Expand Down
6 changes: 3 additions & 3 deletions source/types/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ func Test_Format(t *testing.T) {
{
name: "uuid",
input: []any{
[16]uint8{0xbd, 0x94, 0xee, 0x0b, 0x56, 0x4f, 0x40, 0x88, 0xbf, 0x4e, 0x8d, 0x5e, 0x62, 0x6c, 0xaf, 0x66},
[16]uint8{0xbd, 0x94, 0xee, 0x0b, 0x56, 0x4f, 0x40, 0x88, 0xbf, 0x4e, 0x8d, 0x5e, 0x62, 0x6c, 0xaf, 0x66}, nil,
},
inputOID: []uint32{
pgtype.UUIDOID,
pgtype.UUIDOID, pgtype.UUIDOID,
},
expect: []any{
"bd94ee0b-564f-4088-bf4e-8d5e626caf66",
"bd94ee0b-564f-4088-bf4e-8d5e626caf66", "",
},
},
}
Expand Down
9 changes: 6 additions & 3 deletions source/types/uuid.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ import (

type UUIDFormatter struct{}

// Format returns:
// * string format of Time when connectorn is not builtin
// * time type in UTC when connector is builtin
// Format takes a slice of bytes and returns a UUID in string format
// Returns error when byte array cannot be parsed.
func (UUIDFormatter) Format(v any) (string, error) {
if v == nil {
return "", nil
}

b, ok := v.([16]byte)
if !ok {
return "", fmt.Errorf("failed to parse uuid byte array %v", v)
Expand Down

0 comments on commit d491b85

Please sign in to comment.