Skip to content

Commit

Permalink
Add logrepl combined iterator
Browse files Browse the repository at this point in the history
The new combined iterator integrates snapshots with the existing cdc iterator.
Once the snapshot is completed, it will swithc to cdc.

When using snapshots, the connector will run both cdc and snapshot at the same time.
This is required because the cdc iterator needs to be initialized and export a consistent
snapshot which is taken at the time of the slot creation. The cdc iteator will need to
keep the connection open until the snapshot is done.

The switch to the cdc iterator is triggered when the snapshot returns an end of records error.
Based on this signal, the cdc subscriber will be started to consume changes from the replication slot.
  • Loading branch information
lyuboxa committed May 10, 2024
1 parent 6d5cb81 commit 0a0ba9a
Show file tree
Hide file tree
Showing 26 changed files with 1,112 additions and 812 deletions.
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,11 @@ Example configuration for CDC features:
}
```

:warning: When the connector or pipeline is deleted, the connector will automatically attempt to delete the replication slot and publication.
This is the default behaviour and can be disabled by setting `logrepl.autoCleanup` to `false`.
:warning: When the connector or pipeline is deleted, the connector will automatically attempt to delete the replication slot and publication. This is the default behaviour and can be disabled by setting `logrepl.autoCleanup` to `false`.

## Key Handling

The connector will automatically look up the primary key column for the specified tables. If that can't be determined,
The connector will automatically look up the primary key column for the specified tables. If that can't be determined,
the connector will return an error.

## Configuration Options
Expand Down
13 changes: 4 additions & 9 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,13 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error {
// switches to long polling if it's not. For now use logical replication
fallthrough
case source.CDCModeLogrepl:
if s.config.SnapshotMode == source.SnapshotModeInitial {
// TODO create snapshot iterator for logical replication and pass
// the snapshot mode in the config
logger.Warn().Msg("Snapshot not supported yet in logical replication mode")
}

i, err := logrepl.NewCDCIterator(ctx, s.pool, logrepl.Config{
i, err := logrepl.NewCombinedIterator(ctx, s.pool, logrepl.Config{
Position: pos,
SlotName: s.config.LogreplSlotName,
PublicationName: s.config.LogreplPublicationName,
Tables: s.config.Tables,
TableKeys: s.tableKeys,
WithSnapshot: s.config.SnapshotMode == source.SnapshotModeInitial,
})
if err != nil {
return fmt.Errorf("failed to create logical replication iterator: %w", err)
Expand All @@ -118,8 +113,8 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error {
}

snap, err := snapshot.NewIterator(ctx, pool, snapshot.Config{
Tables: s.config.Tables,
TablesKeys: s.tableKeys,
Tables: s.config.Tables,
TableKeys: s.tableKeys,
})
if err != nil {
return fmt.Errorf("failed to create long polling iterator: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion source/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Config struct {
URL string `json:"url" validate:"required"`
// Tables is a List of table names to read from, separated by a comma, e.g.:"table1,table2".
// Use "*" if you'd like to listen to all tables.
Tables []string `json:"tables"` // TODO: make it required once `Table` is removed.
Tables []string `json:"tables"`
// Deprecated: use `tables` instead.
Table []string `json:"table"`

Expand Down
194 changes: 116 additions & 78 deletions source/logrepl/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,83 +18,117 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"

"github.com/conduitio/conduit-connector-postgres/source/logrepl/internal"
"github.com/conduitio/conduit-connector-postgres/source/position"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
)

// Config holds configuration values for CDCIterator.
type Config struct {
Position sdk.Position
type CDCConfig struct {
LSN pglogrepl.LSN
SlotName string
PublicationName string
Tables []string
TableKeys map[string]string
}

func (c Config) LSN() (pglogrepl.LSN, error) {
if len(c.Position) == 0 {
return 0, nil
}
lsn, err := PositionToLSN(c.Position)
if err != nil {
return 0, fmt.Errorf("failed to parse position: %w", err)
}
return lsn, nil
}

// CDCIterator asynchronously listens for events from the logical replication
// slot and returns them to the caller through Next.
type CDCIterator struct {
config Config
config CDCConfig
records chan sdk.Record
pgconn *pgconn.PgConn

sub *internal.Subscription
subStarted atomic.Bool
sub *internal.Subscription
}

// NewCDCIterator sets up the subscription to a logical replication slot and
// starts a goroutine that listens to events. The goroutine will keep running
// until either the context is canceled or Teardown is called.
func NewCDCIterator(ctx context.Context, connPool *pgxpool.Pool, config Config) (*CDCIterator, error) {
i := &CDCIterator{
config: config,
records: make(chan sdk.Record),
// NewCDCIterator initializes logical replication by creating the publication and subscription manager.
func NewCDCIterator(ctx context.Context, pgconf *pgconn.Config, c CDCConfig) (*CDCIterator, error) {
conn, err := pgconn.ConnectConfig(ctx, withReplication(pgconf))
if err != nil {
return nil, fmt.Errorf("could not establish replication connection: %w", err)
}

err := i.attachSubscription(connPool.Config().ConnConfig.Config)
if err != nil {
return nil, fmt.Errorf("failed to setup subscription: %w", err)
if err := internal.CreatePublication(
ctx,
conn,
c.PublicationName,
internal.CreatePublicationOptions{Tables: c.Tables},
); err != nil {
// If creating the publication fails with code 42710, this means
// the publication already exists.
if !internal.IsPgDuplicateErr(err) {
return nil, err
}

sdk.Logger(ctx).Warn().
Msgf("Publication %q already exists.", c.PublicationName)
}

go i.listen(ctx)
records := make(chan sdk.Record)

sub, err := internal.CreateSubscription(
ctx,
conn,
c.SlotName,
c.PublicationName,
c.Tables,
c.LSN,
NewCDCHandler(internal.NewRelationSet(), c.TableKeys, records).Handle,
)
if err != nil {
return nil, fmt.Errorf("failed to initialize subscription: %w", err)
}

return i, nil
return &CDCIterator{
config: c,
records: records,
pgconn: conn,
sub: sub,
}, nil
}

// listen should be called in a goroutine. It starts the subscription and keeps
// it running until the subscription is stopped or the context is canceled.
func (i *CDCIterator) listen(ctx context.Context) {
// StartSubscriber starts the logical replication service in the background.
// Blocks until the subscription becomes ready.
func (i *CDCIterator) StartSubscriber(ctx context.Context) error {
sdk.Logger(ctx).Info().
Str("slot", i.config.SlotName).
Str("publication", i.config.PublicationName).
Msg("starting logical replication")
Msg("Starting logical replication")

err := i.sub.Start(ctx)
if err != nil {
// log it to be safe we don't miss the error, but use warn level
// because the error will most probably be still propagated to Conduit
// and might be recovered from
sdk.Logger(ctx).Warn().Err(err).Msg("subscription returned an error")
}
go func() {
if err := i.sub.Start(ctx); err != nil {
sdk.Logger(ctx).Error().
Err(err).
Msg("replication exited with an error")
}
}()

<-i.sub.Ready()
i.subStarted.Store(true)

sdk.Logger(ctx).Info().
Str("slot", i.config.SlotName).
Str("publication", i.config.PublicationName).
Msg("Logical replication started")

return nil
}

// Next returns the next record retrieved from the subscription. This call will
// block until either a record is returned from the subscription, the
// subscription stops because of an error or the context gets canceled.
// Returns error when the subscription has been started.
func (i *CDCIterator) Next(ctx context.Context) (sdk.Record, error) {
if started := i.subStarted.Load(); !started {
return sdk.Record{}, errors.New("logical replication has not been started")
}

for {
select {
case <-ctx.Done():
Expand All @@ -118,11 +152,25 @@ func (i *CDCIterator) Next(ctx context.Context) (sdk.Record, error) {
}

// Ack forwards the acknowledgment to the subscription.
func (i *CDCIterator) Ack(_ context.Context, pos sdk.Position) error {
lsn, err := PositionToLSN(pos)
func (i *CDCIterator) Ack(_ context.Context, sdkPos sdk.Position) error {
pos, err := position.ParseSDKPosition(sdkPos)
if err != nil {
return fmt.Errorf("failed to parse position: %w", err)
return err
}

if pos.Type != position.TypeCDC {
return fmt.Errorf("invalid type %q for CDC position", pos.Type.String())
}

lsn, err := pos.LSN()
if err != nil {
return err
}

if lsn == 0 {
return fmt.Errorf("cannot ack zero position")
}

i.sub.Ack(lsn)
return nil
}
Expand All @@ -131,50 +179,40 @@ func (i *CDCIterator) Ack(_ context.Context, pos sdk.Position) error {
// or the context gets canceled. If the subscription stopped with an unexpected
// error, the error is returned.
func (i *CDCIterator) Teardown(ctx context.Context) error {
defer i.pgconn.Close(ctx)

if started := i.subStarted.Load(); !started {
return nil
}

i.sub.Stop()

select {
case <-ctx.Done():
// When context is done, the pgconn will be closed hastedly
// without waiting for the subscription to be completely stopped.
return ctx.Err()
case <-i.sub.Done():
err := i.sub.Err()
if errors.Is(err, context.Canceled) {
// this was a controlled stop
return nil
} else if err != nil {
return fmt.Errorf("logical replication error: %w", err)
}
return nil
case <-i.sub.Ready():
return i.sub.Wait(ctx)
}
}

// attachSubscription determines the starting LSN and key column of the source
// table and prepares a subscription.
func (i *CDCIterator) attachSubscription(connConfig pgconn.Config) error {
lsn, err := i.config.LSN()
if err != nil {
return err
}
// TXSnapshotID returns the transaction snapshot which is received
// when the replication slot is created. The value can be empty, when the
// iterator is resuming.
func (i *CDCIterator) TXSnapshotID() string {
return i.sub.TXSnapshotID
}

// make sure we have all table keys
for _, tableName := range i.config.Tables {
if i.config.TableKeys[tableName] == "" {
return fmt.Errorf("missing key for table %q", tableName)
}
// withReplication adds the `replication` parameter to the connection config.
// This will uprgade a regular command connection to accept replication commands.
func withReplication(pgconf *pgconn.Config) *pgconn.Config {
c := pgconf.Copy()
if c.RuntimeParams == nil {
c.RuntimeParams = make(map[string]string)
}
// enable replication on connection
c.RuntimeParams["replication"] = "database"

sub := internal.NewSubscription(
connConfig,
i.config.SlotName,
i.config.PublicationName,
i.config.Tables,
lsn,
NewCDCHandler(
internal.NewRelationSet(),
i.config.TableKeys,
i.records,
).Handle,
)

i.sub = sub
return nil
return c
}
Loading

0 comments on commit 0a0ba9a

Please sign in to comment.