Skip to content

Commit

Permalink
Add default logrepl autoCleanup mode to remove replication slots and …
Browse files Browse the repository at this point in the history
…publications (#139)

* Add default logrepl autoCleanup mode to remove replication slots and publications

The new option `logrepl.autoCleanup` is not only valid in the context of logical replication.
The default is set to true and will remove the replication slot and publication when the connector is deleted.

This change is a departure from the current implementation, in the following areas:

1. Replication slots are not `temporary` any longer and will persist through pipeline state changes.
2. An implication of 1) the slot will need to be cleaned up, hence why the `autoCleanup` option is set to true.

In cases where the replication slots and publications are pre-created, ensure `logrepl.autoCleanup` is set to `false`.

* cr; add note in docs about slots/pub being deleted on connector deletion
  • Loading branch information
lyuboxa authored May 1, 2024
1 parent 916c195 commit 6d5cb81
Show file tree
Hide file tree
Showing 9 changed files with 272 additions and 14 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ Example configuration for CDC features:
}
```

:warning: When the connector or pipeline is deleted, the connector will automatically attempt to delete the replication slot and publication.
This is the default behaviour and can be disabled by setting `logrepl.autoCleanup` to `false`.

## Key Handling

The connector will automatically look up the primary key column for the specified tables. If that can't be determined,
Expand All @@ -63,6 +66,7 @@ the connector will return an error.
| `cdcMode` | Determines the CDC mode (allowed values: `auto`, `logrepl` or `long_polling`). | false | `auto` |
| `logrepl.publicationName` | Name of the publication to listen for WAL events. | false | `conduitpub` |
| `logrepl.slotName` | Name of the slot opened for replication events. | false | `conduitslot` |
| `logrepl.autoCleanup` | Whether or not to cleanup the replication slot and pub when connector is deleted | false | `true` |
| ~~`table`~~ | List of table names to read from, separated by comma. **Deprecated: use `tables` instead.** | false | |

# Destination
Expand Down
20 changes: 20 additions & 0 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,26 @@ func (s *Source) Teardown(ctx context.Context) error {
return errors.Join(errs...)
}

func (s *Source) LifecycleOnDeleted(ctx context.Context, _ map[string]string) error {
switch s.config.CDCMode {
case source.CDCModeAuto:
fallthrough // TODO: Adjust as `auto` changes.
case source.CDCModeLogrepl:
if !s.config.LogreplAutoCleanup {
sdk.Logger(ctx).Warn().Msg("Skipping logrepl auto cleanup")
return nil
}

return logrepl.Cleanup(ctx, logrepl.CleanupConfig{
URL: s.config.URL,
SlotName: s.config.LogreplSlotName,
PublicationName: s.config.LogreplPublicationName,
})
default:
return nil
}
}

func (s *Source) readingAllTables() bool {
return len(s.config.Tables) == 1 && s.config.Tables[0] == source.AllTablesWildcard
}
Expand Down
4 changes: 4 additions & 0 deletions source/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ type Config struct {
// LogreplSlotName determines the replication slot name in case the
// connector uses logical replication to listen to changes (see CDCMode).
LogreplSlotName string `json:"logrepl.slotName" default:"conduitslot"`

// LogreplAutoCleanup determines if the replication slot and publication should be
// removed when the connector is deleted.
LogreplAutoCleanup bool `json:"logrepl.autoCleanup" default:"true"`
}

// Validate validates the provided config values.
Expand Down
14 changes: 10 additions & 4 deletions source/logrepl/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ func TestIterator_Next(t *testing.T) {
pool := test.ConnectPool(ctx, t, test.RepmgrConnString)
table := test.SetupTestTable(ctx, t, pool)
i := testIterator(ctx, t, pool, table)
t.Cleanup(func() {
is.NoErr(i.Teardown(ctx))
})

// wait for subscription to be ready
<-i.sub.Ready()
Expand Down Expand Up @@ -72,7 +69,7 @@ func TestIterator_Next(t *testing.T) {
{
name: "should detect update",
setupQuery: `UPDATE %s
SET column1 = 'test cdc updates'
SET column1 = 'test cdc updates'
WHERE key = '1'`,
wantErr: false,
want: sdk.Record{
Expand Down Expand Up @@ -146,5 +143,14 @@ func testIterator(ctx context.Context, t *testing.T, pool *pgxpool.Pool, table s

i, err := NewCDCIterator(ctx, pool, config)
is.NoErr(err)
t.Cleanup(func() {
is.NoErr(i.Teardown(ctx))
is.NoErr(Cleanup(ctx, CleanupConfig{
URL: pool.Config().ConnString(),
SlotName: table,
PublicationName: table,
}))
})

return i
}
83 changes: 83 additions & 0 deletions source/logrepl/cleaner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package logrepl

import (
"context"
"errors"
"fmt"

"github.com/conduitio/conduit-connector-postgres/source/logrepl/internal"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5/pgconn"
)

type CleanupConfig struct {
URL string
SlotName string
PublicationName string
}

// Cleanup drops the provided replication and publication.
func Cleanup(ctx context.Context, c CleanupConfig) error {
pgconfig, err := pgconn.ParseConfig(c.URL)
if err != nil {
return fmt.Errorf("failed to parse config URL: %w", err)
}

if pgconfig.RuntimeParams == nil {
pgconfig.RuntimeParams = make(map[string]string)
}
pgconfig.RuntimeParams["replication"] = "database"

conn, err := pgconn.ConnectConfig(ctx, pgconfig)
if err != nil {
return fmt.Errorf("could not establish replication connection: %w", err)
}
defer conn.Close(ctx)

var errs []error

if c.SlotName != "" {
if err := pglogrepl.DropReplicationSlot(
ctx,
conn,
c.SlotName,
pglogrepl.DropReplicationSlotOptions{},
); err != nil {
errs = append(errs, fmt.Errorf("failed to clean up replication slot %q: %w", c.SlotName, err))
}
} else {
sdk.Logger(ctx).Warn().
Msg("cleanup: skipping replication slot cleanup, name is empty")
}

if c.PublicationName != "" {
if err := internal.DropPublication(
ctx,
conn,
c.PublicationName,
internal.DropPublicationOptions{IfExists: true},
); err != nil {
errs = append(errs, fmt.Errorf("failed to clean up publication %q: %w", c.PublicationName, err))
}
} else {
sdk.Logger(ctx).Warn().
Msg("cleanup: skipping publication cleanup, name is empty")
}

return errors.Join(errs...)
}
111 changes: 111 additions & 0 deletions source/logrepl/cleaner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package logrepl

import (
"context"
"errors"
"strings"
"testing"

"github.com/conduitio/conduit-connector-postgres/test"
"github.com/matryer/is"
)

func Test_Cleanup(t *testing.T) {
conn := test.ConnectSimple(context.Background(), t, test.RepmgrConnString)

tests := []struct {
desc string
setup func(t *testing.T)
conf CleanupConfig

wantErr error
}{
{
desc: "drops slot and pub",
conf: CleanupConfig{
URL: test.RepmgrConnString,
SlotName: "conduitslot1",
PublicationName: "conduitpub1",
},
setup: func(t *testing.T) {
test.CreatePublication(t, conn, "conduitpub1")
test.CreateReplicationSlot(t, conn, "conduitslot1")
},
},
{
desc: "drops pub, slot unspecified",
conf: CleanupConfig{
URL: test.RepmgrConnString,
PublicationName: "conduitpub2",
},
setup: func(t *testing.T) {
test.CreatePublication(t, conn, "conduitpub2")
},
},
{
desc: "drops slot, pub unspecified",
conf: CleanupConfig{
URL: test.RepmgrConnString,
SlotName: "conduitslot3",
},
setup: func(t *testing.T) {
test.CreateReplicationSlot(t, conn, "conduitslot3")
},
},
{
desc: "drops pub, slot missing",
conf: CleanupConfig{
URL: test.RepmgrConnString,
SlotName: "conduitslot4",
PublicationName: "conduitpub4",
},
setup: func(t *testing.T) {
test.CreatePublication(t, conn, "conduitpub4")
},
wantErr: errors.New(`replication slot "conduitslot4" does not exist`),
},
{
desc: "drops slot, pub missing", // no op
conf: CleanupConfig{
URL: test.RepmgrConnString,
SlotName: "conduitslot5",
PublicationName: "conduitpub5",
},
setup: func(t *testing.T) {
test.CreateReplicationSlot(t, conn, "conduitslot5")
},
},
}

for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
is := is.New(t)

if tc.setup != nil {
tc.setup(t)
}

err := Cleanup(context.Background(), tc.conf)

if tc.wantErr != nil {
is.True(strings.Contains(err.Error(), tc.wantErr.Error()))
} else {
is.NoErr(err)
}
})
}
}
11 changes: 1 addition & 10 deletions source/logrepl/internal/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,16 +312,8 @@ func (s *Subscription) createPublication(ctx context.Context, conn *pgconn.PgCon
if !errors.As(err, &pgerr) || pgerr.Code != pgDuplicateObjectErrorCode {
return err
}
} else {
// publication was created successfully, drop it when we're done
s.addCleanup(func(ctx context.Context) error {
err := DropPublication(ctx, conn, s.Publication, DropPublicationOptions{})
if err != nil {
return fmt.Errorf("failed to drop publication: %w", err)
}
return nil
})
}

return nil
}

Expand All @@ -335,7 +327,6 @@ func (s *Subscription) createReplicationSlot(ctx context.Context, conn *pgconn.P
s.SlotName,
pgOutputPlugin,
pglogrepl.CreateReplicationSlotOptions{
Temporary: true, // replication slot is dropped when we disconnect
SnapshotAction: "EXPORT_SNAPSHOT",
Mode: pglogrepl.LogicalReplication,
},
Expand Down
6 changes: 6 additions & 0 deletions source/paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions test/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,39 @@ func SetupTestTable(ctx context.Context, t *testing.T, conn Querier) string {
return table
}

func CreateReplicationSlot(t *testing.T, conn *pgx.Conn, slotName string) {
is := is.New(t)

_, err := conn.Exec(
context.Background(),
"SELECT pg_create_logical_replication_slot($1, $2)",
slotName,
"pgoutput",
)
is.NoErr(err)

t.Cleanup(func() {
_, err := conn.Exec(
context.Background(),
"SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name=$1",
slotName,
)
is.NoErr(err)
})
}

func CreatePublication(t *testing.T, conn *pgx.Conn, pubName string) {
is := is.New(t)

_, err := conn.Exec(context.Background(), "CREATE PUBLICATION "+pubName+" FOR ALL TABLES")
is.NoErr(err)

t.Cleanup(func() {
_, err := conn.Exec(context.Background(), "DROP PUBLICATION IF EXISTS "+pubName)
is.NoErr(err)
})
}

func RandomIdentifier(t *testing.T) string {
return fmt.Sprintf("conduit_%v_%d",
strings.ReplaceAll(strings.ToLower(t.Name()), "/", "_"),
Expand Down

0 comments on commit 6d5cb81

Please sign in to comment.