Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add snapshot support to logrepl mode #141

Merged
merged 11 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,11 @@ Example configuration for CDC features:
}
```

:warning: When the connector or pipeline is deleted, the connector will automatically attempt to delete the replication slot and publication.
This is the default behaviour and can be disabled by setting `logrepl.autoCleanup` to `false`.
:warning: When the connector or pipeline is deleted, the connector will automatically attempt to delete the replication slot and publication. This is the default behaviour and can be disabled by setting `logrepl.autoCleanup` to `false`.

## Key Handling

The connector will automatically look up the primary key column for the specified tables. If that can't be determined,
The connector will automatically look up the primary key column for the specified tables. If that can't be determined,
the connector will return an error.

## Configuration Options
Expand Down
13 changes: 4 additions & 9 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,13 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error {
// switches to long polling if it's not. For now use logical replication
fallthrough
case source.CDCModeLogrepl:
if s.config.SnapshotMode == source.SnapshotModeInitial {
// TODO create snapshot iterator for logical replication and pass
// the snapshot mode in the config
logger.Warn().Msg("Snapshot not supported yet in logical replication mode")
}

i, err := logrepl.NewCDCIterator(ctx, s.pool, logrepl.Config{
i, err := logrepl.NewCombinedIterator(ctx, s.pool, logrepl.Config{
Position: pos,
SlotName: s.config.LogreplSlotName,
PublicationName: s.config.LogreplPublicationName,
Tables: s.config.Tables,
TableKeys: s.tableKeys,
WithSnapshot: s.config.SnapshotMode == source.SnapshotModeInitial,
})
if err != nil {
return fmt.Errorf("failed to create logical replication iterator: %w", err)
Expand All @@ -118,8 +113,8 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error {
}

snap, err := snapshot.NewIterator(ctx, pool, snapshot.Config{
Tables: s.config.Tables,
TablesKeys: s.tableKeys,
Tables: s.config.Tables,
TableKeys: s.tableKeys,
})
if err != nil {
return fmt.Errorf("failed to create long polling iterator: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion source/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Config struct {
URL string `json:"url" validate:"required"`
// Tables is a List of table names to read from, separated by a comma, e.g.:"table1,table2".
// Use "*" if you'd like to listen to all tables.
Tables []string `json:"tables"` // TODO: make it required once `Table` is removed.
Tables []string `json:"tables"`
// Deprecated: use `tables` instead.
Table []string `json:"table"`

Expand Down
206 changes: 128 additions & 78 deletions source/logrepl/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,83 +18,119 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/conduitio/conduit-connector-postgres/source/logrepl/internal"
"github.com/conduitio/conduit-connector-postgres/source/position"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
)

const (
subscriberDoneTimeout = time.Second * 2
)

// Config holds configuration values for CDCIterator.
type Config struct {
Position sdk.Position
type CDCConfig struct {
LSN pglogrepl.LSN
SlotName string
PublicationName string
Tables []string
TableKeys map[string]string
}

func (c Config) LSN() (pglogrepl.LSN, error) {
if len(c.Position) == 0 {
return 0, nil
}
lsn, err := PositionToLSN(c.Position)
if err != nil {
return 0, fmt.Errorf("failed to parse position: %w", err)
}
return lsn, nil
}

// CDCIterator asynchronously listens for events from the logical replication
// slot and returns them to the caller through Next.
type CDCIterator struct {
config Config
config CDCConfig
records chan sdk.Record
pgconn *pgconn.PgConn

sub *internal.Subscription
}

// NewCDCIterator sets up the subscription to a logical replication slot and
// starts a goroutine that listens to events. The goroutine will keep running
// until either the context is canceled or Teardown is called.
func NewCDCIterator(ctx context.Context, connPool *pgxpool.Pool, config Config) (*CDCIterator, error) {
i := &CDCIterator{
config: config,
records: make(chan sdk.Record),
// NewCDCIterator initializes logical replication by creating the publication and subscription manager.
func NewCDCIterator(ctx context.Context, pgconf *pgconn.Config, c CDCConfig) (*CDCIterator, error) {
lyuboxa marked this conversation as resolved.
Show resolved Hide resolved
conn, err := pgconn.ConnectConfig(ctx, withReplication(pgconf))
if err != nil {
return nil, fmt.Errorf("could not establish replication connection: %w", err)
}

err := i.attachSubscription(connPool.Config().ConnConfig.Config)
if err != nil {
return nil, fmt.Errorf("failed to setup subscription: %w", err)
if err := internal.CreatePublication(
lovromazgon marked this conversation as resolved.
Show resolved Hide resolved
ctx,
conn,
c.PublicationName,
internal.CreatePublicationOptions{Tables: c.Tables},
lyuboxa marked this conversation as resolved.
Show resolved Hide resolved
); err != nil {
// If creating the publication fails with code 42710, this means
// the publication already exists.
if !internal.IsPgDuplicateErr(err) {
return nil, err
}

sdk.Logger(ctx).Warn().
Msgf("Publication %q already exists.", c.PublicationName)
}

go i.listen(ctx)
records := make(chan sdk.Record)

sub, err := internal.CreateSubscription(
ctx,
conn,
c.SlotName,
c.PublicationName,
c.Tables,
c.LSN,
NewCDCHandler(internal.NewRelationSet(), c.TableKeys, records).Handle,
)
if err != nil {
return nil, fmt.Errorf("failed to initialize subscription: %w", err)
}

return i, nil
return &CDCIterator{
config: c,
records: records,
pgconn: conn,
sub: sub,
}, nil
}

// listen should be called in a goroutine. It starts the subscription and keeps
// it running until the subscription is stopped or the context is canceled.
func (i *CDCIterator) listen(ctx context.Context) {
// StartSubscriber starts the logical replication service in the background.
// Blocks until the subscription becomes ready.
func (i *CDCIterator) StartSubscriber(ctx context.Context) error {
sdk.Logger(ctx).Info().
Str("slot", i.config.SlotName).
Str("publication", i.config.PublicationName).
Msg("starting logical replication")
Msg("Starting logical replication")

err := i.sub.Start(ctx)
if err != nil {
// log it to be safe we don't miss the error, but use warn level
// because the error will most probably be still propagated to Conduit
// and might be recovered from
sdk.Logger(ctx).Warn().Err(err).Msg("subscription returned an error")
}
go func() {
if err := i.sub.Run(ctx); err != nil {
sdk.Logger(ctx).Error().
Err(err).
Msg("replication exited with an error")
}
}()

<-i.sub.Ready()

sdk.Logger(ctx).Info().
Str("slot", i.config.SlotName).
Str("publication", i.config.PublicationName).
Msg("Logical replication started")

return nil
}

// Next returns the next record retrieved from the subscription. This call will
// block until either a record is returned from the subscription, the
// subscription stops because of an error or the context gets canceled.
// Returns error when the subscription has been started.
func (i *CDCIterator) Next(ctx context.Context) (sdk.Record, error) {
if !i.subscriberReady() {
return sdk.Record{}, errors.New("logical replication has not been started")
}

for {
select {
case <-ctx.Done():
Expand All @@ -118,11 +154,25 @@ func (i *CDCIterator) Next(ctx context.Context) (sdk.Record, error) {
}

// Ack forwards the acknowledgment to the subscription.
func (i *CDCIterator) Ack(_ context.Context, pos sdk.Position) error {
lsn, err := PositionToLSN(pos)
func (i *CDCIterator) Ack(_ context.Context, sdkPos sdk.Position) error {
pos, err := position.ParseSDKPosition(sdkPos)
if err != nil {
return fmt.Errorf("failed to parse position: %w", err)
return err
}

if pos.Type != position.TypeCDC {
return fmt.Errorf("invalid type %q for CDC position", pos.Type.String())
}

lsn, err := pos.LSN()
if err != nil {
return err
}

if lsn == 0 {
return fmt.Errorf("cannot ack zero position")
}

i.sub.Ack(lsn)
return nil
}
Expand All @@ -131,50 +181,50 @@ func (i *CDCIterator) Ack(_ context.Context, pos sdk.Position) error {
// or the context gets canceled. If the subscription stopped with an unexpected
// error, the error is returned.
func (i *CDCIterator) Teardown(ctx context.Context) error {
i.sub.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-i.sub.Done():
err := i.sub.Err()
if errors.Is(err, context.Canceled) {
// this was a controlled stop
return nil
} else if err != nil {
return fmt.Errorf("logical replication error: %w", err)
defer func() {
// Give the subscription 2 seconds to settle.
select {
case <-i.sub.Done():
case <-time.After(subscriberDoneTimeout):
}

i.pgconn.Close(ctx)
}()

if !i.subscriberReady() {
return nil
}

i.sub.Stop()
return i.sub.Wait(ctx)
}

// attachSubscription determines the starting LSN and key column of the source
// table and prepares a subscription.
func (i *CDCIterator) attachSubscription(connConfig pgconn.Config) error {
lsn, err := i.config.LSN()
if err != nil {
return err
// subscriberReady returns true when the subscriber is running.
func (i *CDCIterator) subscriberReady() bool {
select {
case <-i.sub.Ready():
return true
default:
return false
}
}

// make sure we have all table keys
for _, tableName := range i.config.Tables {
if i.config.TableKeys[tableName] == "" {
return fmt.Errorf("missing key for table %q", tableName)
}
}
// TXSnapshotID returns the transaction snapshot which is received
// when the replication slot is created. The value can be empty, when the
// iterator is resuming.
func (i *CDCIterator) TXSnapshotID() string {
return i.sub.TXSnapshotID
}

sub := internal.NewSubscription(
connConfig,
i.config.SlotName,
i.config.PublicationName,
i.config.Tables,
lsn,
NewCDCHandler(
internal.NewRelationSet(),
i.config.TableKeys,
i.records,
).Handle,
)
// withReplication adds the `replication` parameter to the connection config.
// This will uprgade a regular command connection to accept replication commands.
func withReplication(pgconf *pgconn.Config) *pgconn.Config {
c := pgconf.Copy()
if c.RuntimeParams == nil {
c.RuntimeParams = make(map[string]string)
}
// enable replication on connection
c.RuntimeParams["replication"] = "database"

i.sub = sub
return nil
return c
}
Loading