Skip to content

Commit

Permalink
Improve handling of keepalive messages and manage server end lag. (#167)
Browse files Browse the repository at this point in the history
* Improve handling of keepalive messages and manage server end lag.

* Ensure expected commit lsn matches that of begin messages.
* Adjust the status update to send the last server lsn
when all data nas been written and acked. This change will help with managing
server lag with relatively less frequently updated tables.
  • Loading branch information
lyuboxa committed Jun 13, 2024
1 parent f190c59 commit 9b84f73
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 51 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ build:
.PHONY: test
test:
# run required docker containers, execute integration tests, stop containers after tests
docker compose -f test/docker-compose.yml up --quiet-pull -d --wait
docker compose -f test/docker-compose.yml up --force-recreate --quiet-pull -d --wait
go test $(GOTEST_FLAGS) -race ./...; ret=$$?; \
docker compose -f test/docker-compose.yml down --volumes; \
exit $$ret
Expand Down
68 changes: 68 additions & 0 deletions source/logrepl/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/matryer/is"
Expand Down Expand Up @@ -274,6 +275,48 @@ func TestCDCIterator_Next_Fail(t *testing.T) {
})
}

func TestCDCIterator_EnsureLSN(t *testing.T) {
// t.Skip()

ctx := context.Background()
is := is.New(t)

pool := test.ConnectPool(ctx, t, test.RepmgrConnString)
table := test.SetupTestTable(ctx, t, pool)

i := testCDCIterator(ctx, t, pool, table, true)
<-i.sub.Ready()

_, err := pool.Exec(ctx, fmt.Sprintf(`INSERT INTO %s (id, column1, column2, column3, column4, column5)
VALUES (6, 'bizz', 456, false, 12.3, 14)`, table))
is.NoErr(err)

r, err := i.Next(ctx)
is.NoErr(err)

p, err := position.ParseSDKPosition(r.Position)
is.NoErr(err)

lsn, err := p.LSN()
is.NoErr(err)

writeLSN, flushLSN, err := fetchSlotStats(t, pool, table) // table is the slot name
is.NoErr(err)

is.Equal(lsn, writeLSN)
is.True(flushLSN < lsn)

is.NoErr(i.Ack(ctx, r.Position))
time.Sleep(2 * time.Second) // wait for at least two status updates

writeLSN, flushLSN, err = fetchSlotStats(t, pool, table) // table is the slot name
is.NoErr(err)

is.True(lsn <= writeLSN)
is.True(lsn <= flushLSN)
is.Equal(writeLSN, flushLSN)
}

func TestCDCIterator_Ack(t *testing.T) {
ctx := context.Background()

Expand Down Expand Up @@ -346,6 +389,8 @@ func testCDCIterator(ctx context.Context, t *testing.T, pool *pgxpool.Pool, tabl
i, err := NewCDCIterator(ctx, &pool.Config().ConnConfig.Config, config)
is.NoErr(err)

i.sub.StatusTimeout = 1 * time.Second

if start {
is.NoErr(i.StartSubscriber(ctx))
}
Expand All @@ -361,3 +406,26 @@ func testCDCIterator(ctx context.Context, t *testing.T, pool *pgxpool.Pool, tabl

return i
}

func fetchSlotStats(t *testing.T, c test.Querier, slotName string) (pglogrepl.LSN, pglogrepl.LSN, error) {
t.Helper()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
defer cancel()

var writeLSN, flushLSN pglogrepl.LSN
for {
query := fmt.Sprintf(`SELECT write_lsn, flush_lsn
FROM pg_stat_replication s JOIN pg_replication_slots rs ON s.pid = rs.active_pid
WHERE rs.slot_name = '%s'`, slotName)

err := c.QueryRow(ctx, query).Scan(&writeLSN, &flushLSN)
if err == nil {
return writeLSN, flushLSN, nil
}
if errors.Is(err, context.DeadlineExceeded) {
return 0, 0, err
}
time.Sleep(100 * time.Millisecond)
}
}
1 change: 0 additions & 1 deletion source/logrepl/combined_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ func TestCombinedIterator_Next(t *testing.T) {

t.Run("next_connector_resume_cdc_6", func(t *testing.T) {
is := is.New(t)

i, err := NewCombinedIterator(ctx, pool, Config{
Position: lastPos,
Tables: []string{table},
Expand Down
33 changes: 20 additions & 13 deletions source/logrepl/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type CDCHandler struct {
tableKeys map[string]string
relationSet *internal.RelationSet
out chan<- sdk.Record
lastTXLSN pglogrepl.LSN
}

func NewCDCHandler(
Expand All @@ -45,35 +46,41 @@ func NewCDCHandler(
}

// Handle is the handler function that receives all logical replication messages.
func (h *CDCHandler) Handle(ctx context.Context, m pglogrepl.Message, lsn pglogrepl.LSN) error {
// Returns non-zero LSN when a record was emitted for the message.
func (h *CDCHandler) Handle(ctx context.Context, m pglogrepl.Message, lsn pglogrepl.LSN) (pglogrepl.LSN, error) {
sdk.Logger(ctx).Trace().
Str("lsn", lsn.String()).
Str("messageType", m.Type().String()).
Msg("handler received pglogrepl.Message")

switch m := m.(type) {
case *pglogrepl.RelationMessage:
// We have to add the Relations to our Set so that we can
// decode our own output
// We have to add the Relations to our Set so that we can decode our own output
h.relationSet.Add(m)
case *pglogrepl.InsertMessage:
err := h.handleInsert(ctx, m, lsn)
if err != nil {
return fmt.Errorf("logrepl handler insert: %w", err)
if err := h.handleInsert(ctx, m, lsn); err != nil {
return 0, fmt.Errorf("logrepl handler insert: %w", err)
}
return lsn, nil
case *pglogrepl.UpdateMessage:
err := h.handleUpdate(ctx, m, lsn)
if err != nil {
return fmt.Errorf("logrepl handler update: %w", err)
if err := h.handleUpdate(ctx, m, lsn); err != nil {
return 0, fmt.Errorf("logrepl handler update: %w", err)
}
return lsn, nil
case *pglogrepl.DeleteMessage:
err := h.handleDelete(ctx, m, lsn)
if err != nil {
return fmt.Errorf("logrepl handler delete: %w", err)
if err := h.handleDelete(ctx, m, lsn); err != nil {
return 0, fmt.Errorf("logrepl handler delete: %w", err)
}
return lsn, nil
case *pglogrepl.BeginMessage:
h.lastTXLSN = m.FinalLSN
case *pglogrepl.CommitMessage:
if h.lastTXLSN != 0 && h.lastTXLSN != m.CommitLSN {
return 0, fmt.Errorf("out of order commit %s, expected %s", m.CommitLSN, h.lastTXLSN)
}
}

return nil
return 0, nil
}

// handleInsert formats a Record with INSERT event data from Postgres and sends
Expand Down
110 changes: 78 additions & 32 deletions source/logrepl/internal/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ const (

// Subscription manages a subscription to a logical replication slot.
type Subscription struct {
SlotName string
Publication string
Tables []string
StartLSN pglogrepl.LSN
Handler Handler
StatusTimeout time.Duration
TXSnapshotID string
SlotName string
Publication string
Tables []string
StartLSN pglogrepl.LSN
ConsistentSlotLSN pglogrepl.LSN
Handler Handler
StatusTimeout time.Duration
TXSnapshotID string

conn *pgconn.PgConn

Expand All @@ -50,11 +51,12 @@ type Subscription struct {
done chan struct{}
doneErr error

walWritten pglogrepl.LSN
walFlushed pglogrepl.LSN
walWritten pglogrepl.LSN
walFlushed pglogrepl.LSN
serverWALEnd pglogrepl.LSN
}

type Handler func(context.Context, pglogrepl.Message, pglogrepl.LSN) error
type Handler func(context.Context, pglogrepl.Message, pglogrepl.LSN) (pglogrepl.LSN, error)

// CreateSubscription initializes the logical replication subscriber by creating the replication slot.
func CreateSubscription(
Expand All @@ -66,6 +68,8 @@ func CreateSubscription(
startLSN pglogrepl.LSN,
h Handler,
) (*Subscription, error) {
var slotLSN pglogrepl.LSN

result, err := pglogrepl.CreateReplicationSlot(
ctx,
conn,
Expand All @@ -87,14 +91,26 @@ func CreateSubscription(
Msgf("replication slot %q already exists", slotName)
}

// If a ConsistentPoint is available use it, otherwise fall back to the startLSN
slotLSN, err = pglogrepl.ParseLSN(result.ConsistentPoint)
if err != nil {
if startLSN == 0 {
sdk.Logger(ctx).Warn().
Msg("start LSN is zero, using existing replication slot without position")
}

slotLSN = startLSN
}

return &Subscription{
SlotName: slotName,
Publication: publication,
Tables: tables,
StartLSN: startLSN,
Handler: h,
StatusTimeout: 10 * time.Second,
TXSnapshotID: result.SnapshotName,
SlotName: slotName,
Publication: publication,
Tables: tables,
StartLSN: startLSN,
ConsistentSlotLSN: slotLSN,
Handler: h,
StatusTimeout: 10 * time.Second,
TXSnapshotID: result.SnapshotName,

conn: conn,

Expand Down Expand Up @@ -158,13 +174,11 @@ func (s *Subscription) listen(ctx context.Context) error {

switch copyDataMsg.Data[0] {
case pglogrepl.PrimaryKeepaliveMessageByteID:
err := s.handlePrimaryKeepaliveMessage(ctx, copyDataMsg)
if err != nil {
if err := s.handlePrimaryKeepaliveMessage(ctx, copyDataMsg); err != nil {
return err
}
case pglogrepl.XLogDataByteID:
err := s.handleXLogData(ctx, copyDataMsg)
if err != nil {
if err := s.handleXLogData(ctx, copyDataMsg); err != nil {
return err
}
default:
Expand All @@ -184,11 +198,15 @@ func (s *Subscription) handlePrimaryKeepaliveMessage(ctx context.Context, copyDa
if err != nil {
return fmt.Errorf("failed to parse primary keepalive message: %w", err)
}

atomic.StoreUint64((*uint64)(&s.serverWALEnd), uint64(pkm.ServerWALEnd))

if pkm.ReplyRequested {
if err = s.sendStandbyStatusUpdate(ctx); err != nil {
if err := s.sendStandbyStatusUpdate(ctx); err != nil {
return fmt.Errorf("failed to send status: %w", err)
}
}

return nil
}

Expand All @@ -210,13 +228,21 @@ func (s *Subscription) handleXLogData(ctx context.Context, copyDataMsg *pgproto3
return fmt.Errorf("invalid message: %w", err)
}

if err = s.Handler(ctx, logicalMsg, xld.WALStart); err != nil {
writtenLSN, err := s.Handler(ctx, logicalMsg, xld.WALStart)
if err != nil {
return fmt.Errorf("handler error: %w", err)
}

if xld.WALStart > 0 {
s.walWritten = xld.WALStart
// When starting on an existing slot without having a consistent slot LSN,
// set the flushed WAL LSN to just before the received LSN
if s.walWritten == 0 && s.ConsistentSlotLSN == 0 {
atomic.StoreUint64((*uint64)(&s.walFlushed), uint64(writtenLSN-1))
}

if writtenLSN > 0 {
s.walWritten = writtenLSN
}

return nil
}

Expand Down Expand Up @@ -315,20 +341,40 @@ func (s *Subscription) sendStandbyStatusUpdate(ctx context.Context) error {
return fmt.Errorf("walWrite (%s) should be >= walFlush (%s)", s.walWritten, walFlushed)
}

// N.B. Manage replication slot lag, by responding with the last server LSN, when
// all previous slot relevant msgs have been written and flushed
replyWithWALEnd := walFlushed == s.walWritten && walFlushed < s.serverWALEnd

sdk.Logger(ctx).Trace().
Str("walWrite", s.walWritten.String()).
Str("walFlush", walFlushed.String()).
Str("walApply", walFlushed.String()).
Stringer("wal_write", s.walWritten).
Stringer("wal_flush", walFlushed).
Stringer("server_wal_end", s.serverWALEnd).
Bool("server_wal_end_sent", replyWithWALEnd).
Msg("sending standby status update")

err := pglogrepl.SendStandbyStatusUpdate(ctx, s.conn, pglogrepl.StandbyStatusUpdate{
if replyWithWALEnd {
if err := pglogrepl.SendStandbyStatusUpdate(ctx, s.conn, pglogrepl.StandbyStatusUpdate{
WALWritePosition: s.serverWALEnd,
}); err != nil {
return fmt.Errorf("failed to send standby status update with server end lsn: %w", err)
}

return nil
}

// No messages have been flushed, in this case set the flushed
// to just before the slot consistent LSN. This prevents flushed
// to be automatically assigned the written LSN by the pglogrepl pkg
if walFlushed == 0 {
walFlushed = s.ConsistentSlotLSN - 1
}

if err := pglogrepl.SendStandbyStatusUpdate(ctx, s.conn, pglogrepl.StandbyStatusUpdate{
WALWritePosition: s.walWritten,
WALFlushPosition: walFlushed,
WALApplyPosition: walFlushed,
ClientTime: time.Now(),
ReplyRequested: false,
})
if err != nil {
}); err != nil {
return fmt.Errorf("failed to send standby status update: %w", err)
}

Expand Down
Loading

0 comments on commit 9b84f73

Please sign in to comment.