From 9b84f73befd1104cb170a290e983827f636f3bd1 Mon Sep 17 00:00:00 2001 From: Lyubo Kamenov Date: Thu, 13 Jun 2024 13:36:42 -0400 Subject: [PATCH] Improve handling of keepalive messages and manage server end lag. (#167) * Improve handling of keepalive messages and manage server end lag. * Ensure expected commit lsn matches that of begin messages. * Adjust the status update to send the last server lsn when all data nas been written and acked. This change will help with managing server lag with relatively less frequently updated tables. --- Makefile | 2 +- source/logrepl/cdc_test.go | 68 ++++++++++++ source/logrepl/combined_test.go | 1 - source/logrepl/handler.go | 33 +++--- source/logrepl/internal/subscription.go | 110 +++++++++++++------ source/logrepl/internal/subscription_test.go | 22 +++- 6 files changed, 185 insertions(+), 51 deletions(-) diff --git a/Makefile b/Makefile index 54d88d2..5752635 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ build: .PHONY: test test: # run required docker containers, execute integration tests, stop containers after tests - docker compose -f test/docker-compose.yml up --quiet-pull -d --wait + docker compose -f test/docker-compose.yml up --force-recreate --quiet-pull -d --wait go test $(GOTEST_FLAGS) -race ./...; ret=$$?; \ docker compose -f test/docker-compose.yml down --volumes; \ exit $$ret diff --git a/source/logrepl/cdc_test.go b/source/logrepl/cdc_test.go index 392bc18..e01f97b 100644 --- a/source/logrepl/cdc_test.go +++ b/source/logrepl/cdc_test.go @@ -27,6 +27,7 @@ import ( sdk "github.com/conduitio/conduit-connector-sdk" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" "github.com/matryer/is" @@ -274,6 +275,48 @@ func TestCDCIterator_Next_Fail(t *testing.T) { }) } +func TestCDCIterator_EnsureLSN(t *testing.T) { + // t.Skip() + + ctx := context.Background() + is := is.New(t) + + pool := test.ConnectPool(ctx, t, test.RepmgrConnString) + table := test.SetupTestTable(ctx, t, pool) + + i := testCDCIterator(ctx, t, pool, table, true) + <-i.sub.Ready() + + _, err := pool.Exec(ctx, fmt.Sprintf(`INSERT INTO %s (id, column1, column2, column3, column4, column5) + VALUES (6, 'bizz', 456, false, 12.3, 14)`, table)) + is.NoErr(err) + + r, err := i.Next(ctx) + is.NoErr(err) + + p, err := position.ParseSDKPosition(r.Position) + is.NoErr(err) + + lsn, err := p.LSN() + is.NoErr(err) + + writeLSN, flushLSN, err := fetchSlotStats(t, pool, table) // table is the slot name + is.NoErr(err) + + is.Equal(lsn, writeLSN) + is.True(flushLSN < lsn) + + is.NoErr(i.Ack(ctx, r.Position)) + time.Sleep(2 * time.Second) // wait for at least two status updates + + writeLSN, flushLSN, err = fetchSlotStats(t, pool, table) // table is the slot name + is.NoErr(err) + + is.True(lsn <= writeLSN) + is.True(lsn <= flushLSN) + is.Equal(writeLSN, flushLSN) +} + func TestCDCIterator_Ack(t *testing.T) { ctx := context.Background() @@ -346,6 +389,8 @@ func testCDCIterator(ctx context.Context, t *testing.T, pool *pgxpool.Pool, tabl i, err := NewCDCIterator(ctx, &pool.Config().ConnConfig.Config, config) is.NoErr(err) + i.sub.StatusTimeout = 1 * time.Second + if start { is.NoErr(i.StartSubscriber(ctx)) } @@ -361,3 +406,26 @@ func testCDCIterator(ctx context.Context, t *testing.T, pool *pgxpool.Pool, tabl return i } + +func fetchSlotStats(t *testing.T, c test.Querier, slotName string) (pglogrepl.LSN, pglogrepl.LSN, error) { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + defer cancel() + + var writeLSN, flushLSN pglogrepl.LSN + for { + query := fmt.Sprintf(`SELECT write_lsn, flush_lsn + FROM pg_stat_replication s JOIN pg_replication_slots rs ON s.pid = rs.active_pid + WHERE rs.slot_name = '%s'`, slotName) + + err := c.QueryRow(ctx, query).Scan(&writeLSN, &flushLSN) + if err == nil { + return writeLSN, flushLSN, nil + } + if errors.Is(err, context.DeadlineExceeded) { + return 0, 0, err + } + time.Sleep(100 * time.Millisecond) + } +} diff --git a/source/logrepl/combined_test.go b/source/logrepl/combined_test.go index 47ef7ad..daf6cc3 100644 --- a/source/logrepl/combined_test.go +++ b/source/logrepl/combined_test.go @@ -211,7 +211,6 @@ func TestCombinedIterator_Next(t *testing.T) { t.Run("next_connector_resume_cdc_6", func(t *testing.T) { is := is.New(t) - i, err := NewCombinedIterator(ctx, pool, Config{ Position: lastPos, Tables: []string{table}, diff --git a/source/logrepl/handler.go b/source/logrepl/handler.go index 87719bf..6333a61 100644 --- a/source/logrepl/handler.go +++ b/source/logrepl/handler.go @@ -30,6 +30,7 @@ type CDCHandler struct { tableKeys map[string]string relationSet *internal.RelationSet out chan<- sdk.Record + lastTXLSN pglogrepl.LSN } func NewCDCHandler( @@ -45,7 +46,8 @@ func NewCDCHandler( } // Handle is the handler function that receives all logical replication messages. -func (h *CDCHandler) Handle(ctx context.Context, m pglogrepl.Message, lsn pglogrepl.LSN) error { +// Returns non-zero LSN when a record was emitted for the message. +func (h *CDCHandler) Handle(ctx context.Context, m pglogrepl.Message, lsn pglogrepl.LSN) (pglogrepl.LSN, error) { sdk.Logger(ctx).Trace(). Str("lsn", lsn.String()). Str("messageType", m.Type().String()). @@ -53,27 +55,32 @@ func (h *CDCHandler) Handle(ctx context.Context, m pglogrepl.Message, lsn pglogr switch m := m.(type) { case *pglogrepl.RelationMessage: - // We have to add the Relations to our Set so that we can - // decode our own output + // We have to add the Relations to our Set so that we can decode our own output h.relationSet.Add(m) case *pglogrepl.InsertMessage: - err := h.handleInsert(ctx, m, lsn) - if err != nil { - return fmt.Errorf("logrepl handler insert: %w", err) + if err := h.handleInsert(ctx, m, lsn); err != nil { + return 0, fmt.Errorf("logrepl handler insert: %w", err) } + return lsn, nil case *pglogrepl.UpdateMessage: - err := h.handleUpdate(ctx, m, lsn) - if err != nil { - return fmt.Errorf("logrepl handler update: %w", err) + if err := h.handleUpdate(ctx, m, lsn); err != nil { + return 0, fmt.Errorf("logrepl handler update: %w", err) } + return lsn, nil case *pglogrepl.DeleteMessage: - err := h.handleDelete(ctx, m, lsn) - if err != nil { - return fmt.Errorf("logrepl handler delete: %w", err) + if err := h.handleDelete(ctx, m, lsn); err != nil { + return 0, fmt.Errorf("logrepl handler delete: %w", err) + } + return lsn, nil + case *pglogrepl.BeginMessage: + h.lastTXLSN = m.FinalLSN + case *pglogrepl.CommitMessage: + if h.lastTXLSN != 0 && h.lastTXLSN != m.CommitLSN { + return 0, fmt.Errorf("out of order commit %s, expected %s", m.CommitLSN, h.lastTXLSN) } } - return nil + return 0, nil } // handleInsert formats a Record with INSERT event data from Postgres and sends diff --git a/source/logrepl/internal/subscription.go b/source/logrepl/internal/subscription.go index 7bc6abc..a3dc413 100644 --- a/source/logrepl/internal/subscription.go +++ b/source/logrepl/internal/subscription.go @@ -34,13 +34,14 @@ const ( // Subscription manages a subscription to a logical replication slot. type Subscription struct { - SlotName string - Publication string - Tables []string - StartLSN pglogrepl.LSN - Handler Handler - StatusTimeout time.Duration - TXSnapshotID string + SlotName string + Publication string + Tables []string + StartLSN pglogrepl.LSN + ConsistentSlotLSN pglogrepl.LSN + Handler Handler + StatusTimeout time.Duration + TXSnapshotID string conn *pgconn.PgConn @@ -50,11 +51,12 @@ type Subscription struct { done chan struct{} doneErr error - walWritten pglogrepl.LSN - walFlushed pglogrepl.LSN + walWritten pglogrepl.LSN + walFlushed pglogrepl.LSN + serverWALEnd pglogrepl.LSN } -type Handler func(context.Context, pglogrepl.Message, pglogrepl.LSN) error +type Handler func(context.Context, pglogrepl.Message, pglogrepl.LSN) (pglogrepl.LSN, error) // CreateSubscription initializes the logical replication subscriber by creating the replication slot. func CreateSubscription( @@ -66,6 +68,8 @@ func CreateSubscription( startLSN pglogrepl.LSN, h Handler, ) (*Subscription, error) { + var slotLSN pglogrepl.LSN + result, err := pglogrepl.CreateReplicationSlot( ctx, conn, @@ -87,14 +91,26 @@ func CreateSubscription( Msgf("replication slot %q already exists", slotName) } + // If a ConsistentPoint is available use it, otherwise fall back to the startLSN + slotLSN, err = pglogrepl.ParseLSN(result.ConsistentPoint) + if err != nil { + if startLSN == 0 { + sdk.Logger(ctx).Warn(). + Msg("start LSN is zero, using existing replication slot without position") + } + + slotLSN = startLSN + } + return &Subscription{ - SlotName: slotName, - Publication: publication, - Tables: tables, - StartLSN: startLSN, - Handler: h, - StatusTimeout: 10 * time.Second, - TXSnapshotID: result.SnapshotName, + SlotName: slotName, + Publication: publication, + Tables: tables, + StartLSN: startLSN, + ConsistentSlotLSN: slotLSN, + Handler: h, + StatusTimeout: 10 * time.Second, + TXSnapshotID: result.SnapshotName, conn: conn, @@ -158,13 +174,11 @@ func (s *Subscription) listen(ctx context.Context) error { switch copyDataMsg.Data[0] { case pglogrepl.PrimaryKeepaliveMessageByteID: - err := s.handlePrimaryKeepaliveMessage(ctx, copyDataMsg) - if err != nil { + if err := s.handlePrimaryKeepaliveMessage(ctx, copyDataMsg); err != nil { return err } case pglogrepl.XLogDataByteID: - err := s.handleXLogData(ctx, copyDataMsg) - if err != nil { + if err := s.handleXLogData(ctx, copyDataMsg); err != nil { return err } default: @@ -184,11 +198,15 @@ func (s *Subscription) handlePrimaryKeepaliveMessage(ctx context.Context, copyDa if err != nil { return fmt.Errorf("failed to parse primary keepalive message: %w", err) } + + atomic.StoreUint64((*uint64)(&s.serverWALEnd), uint64(pkm.ServerWALEnd)) + if pkm.ReplyRequested { - if err = s.sendStandbyStatusUpdate(ctx); err != nil { + if err := s.sendStandbyStatusUpdate(ctx); err != nil { return fmt.Errorf("failed to send status: %w", err) } } + return nil } @@ -210,13 +228,21 @@ func (s *Subscription) handleXLogData(ctx context.Context, copyDataMsg *pgproto3 return fmt.Errorf("invalid message: %w", err) } - if err = s.Handler(ctx, logicalMsg, xld.WALStart); err != nil { + writtenLSN, err := s.Handler(ctx, logicalMsg, xld.WALStart) + if err != nil { return fmt.Errorf("handler error: %w", err) } - if xld.WALStart > 0 { - s.walWritten = xld.WALStart + // When starting on an existing slot without having a consistent slot LSN, + // set the flushed WAL LSN to just before the received LSN + if s.walWritten == 0 && s.ConsistentSlotLSN == 0 { + atomic.StoreUint64((*uint64)(&s.walFlushed), uint64(writtenLSN-1)) } + + if writtenLSN > 0 { + s.walWritten = writtenLSN + } + return nil } @@ -315,20 +341,40 @@ func (s *Subscription) sendStandbyStatusUpdate(ctx context.Context) error { return fmt.Errorf("walWrite (%s) should be >= walFlush (%s)", s.walWritten, walFlushed) } + // N.B. Manage replication slot lag, by responding with the last server LSN, when + // all previous slot relevant msgs have been written and flushed + replyWithWALEnd := walFlushed == s.walWritten && walFlushed < s.serverWALEnd + sdk.Logger(ctx).Trace(). - Str("walWrite", s.walWritten.String()). - Str("walFlush", walFlushed.String()). - Str("walApply", walFlushed.String()). + Stringer("wal_write", s.walWritten). + Stringer("wal_flush", walFlushed). + Stringer("server_wal_end", s.serverWALEnd). + Bool("server_wal_end_sent", replyWithWALEnd). Msg("sending standby status update") - err := pglogrepl.SendStandbyStatusUpdate(ctx, s.conn, pglogrepl.StandbyStatusUpdate{ + if replyWithWALEnd { + if err := pglogrepl.SendStandbyStatusUpdate(ctx, s.conn, pglogrepl.StandbyStatusUpdate{ + WALWritePosition: s.serverWALEnd, + }); err != nil { + return fmt.Errorf("failed to send standby status update with server end lsn: %w", err) + } + + return nil + } + + // No messages have been flushed, in this case set the flushed + // to just before the slot consistent LSN. This prevents flushed + // to be automatically assigned the written LSN by the pglogrepl pkg + if walFlushed == 0 { + walFlushed = s.ConsistentSlotLSN - 1 + } + + if err := pglogrepl.SendStandbyStatusUpdate(ctx, s.conn, pglogrepl.StandbyStatusUpdate{ WALWritePosition: s.walWritten, WALFlushPosition: walFlushed, WALApplyPosition: walFlushed, - ClientTime: time.Now(), ReplyRequested: false, - }) - if err != nil { + }); err != nil { return fmt.Errorf("failed to send standby status update: %w", err) } diff --git a/source/logrepl/internal/subscription_test.go b/source/logrepl/internal/subscription_test.go index 5d5256d..b0e989a 100644 --- a/source/logrepl/internal/subscription_test.go +++ b/source/logrepl/internal/subscription_test.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "sync/atomic" "testing" "time" @@ -46,7 +47,7 @@ func TestSubscription_WithRepmgr(t *testing.T) { table1 := test.SetupTestTable(ctx, t, conn) table2 := test.SetupTestTable(ctx, t, conn) - _, messages := setupSubscription(ctx, t, replConn, table1, table2) + sub, messages := setupSubscription(ctx, t, replConn, table1, table2) fetchAndAssertMessageTypes := func(is *is.I, m chan pglogrepl.Message, msgTypes ...pglogrepl.MessageType) []pglogrepl.Message { out := make([]pglogrepl.Message, len(msgTypes)) @@ -133,6 +134,17 @@ func TestSubscription_WithRepmgr(t *testing.T) { ) }) + t.Run("Last WAL written is behind keepalive", func(t *testing.T) { + is := is.New(t) + time.Sleep(2 * time.Second) + + walFlushed := pglogrepl.LSN(atomic.LoadUint64((*uint64)(&sub.walFlushed))) + serverWALEnd := pglogrepl.LSN(atomic.LoadUint64((*uint64)(&sub.serverWALEnd))) + + is.True(serverWALEnd >= sub.walWritten) + is.True(sub.walWritten > walFlushed) + }) + t.Run("no more messages", func(t *testing.T) { isNoMoreMessages(t, messages, time.Millisecond*500) }) @@ -221,17 +233,19 @@ func setupSubscription( publication, tables, 0, - func(ctx context.Context, msg pglogrepl.Message, _ pglogrepl.LSN) error { + func(ctx context.Context, msg pglogrepl.Message, lsn pglogrepl.LSN) (pglogrepl.LSN, error) { select { case <-ctx.Done(): - return ctx.Err() + return 0, ctx.Err() case messages <- msg: - return nil + return lsn, nil } }, ) is.NoErr(err) + sub.StatusTimeout = 1 * time.Second + go func() { err := sub.Run(ctx) if !errors.Is(err, context.Canceled) {