Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve handling of keepalive messages and manage server end lag. #167

Merged
merged 8 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ build:
.PHONY: test
test:
# run required docker containers, execute integration tests, stop containers after tests
docker compose -f test/docker-compose.yml up --quiet-pull -d --wait
docker compose -f test/docker-compose.yml up --force-recreate --quiet-pull -d --wait
go test $(GOTEST_FLAGS) -race ./...; ret=$$?; \
docker compose -f test/docker-compose.yml down --volumes; \
exit $$ret
Expand Down
68 changes: 68 additions & 0 deletions source/logrepl/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/matryer/is"
Expand Down Expand Up @@ -274,6 +275,48 @@ func TestCDCIterator_Next_Fail(t *testing.T) {
})
}

func TestCDCIterator_EnsureLSN(t *testing.T) {
// t.Skip()

ctx := context.Background()
is := is.New(t)

pool := test.ConnectPool(ctx, t, test.RepmgrConnString)
table := test.SetupTestTable(ctx, t, pool)

i := testCDCIterator(ctx, t, pool, table, true)
<-i.sub.Ready()

_, err := pool.Exec(ctx, fmt.Sprintf(`INSERT INTO %s (id, column1, column2, column3, column4, column5)
VALUES (6, 'bizz', 456, false, 12.3, 14)`, table))
is.NoErr(err)

r, err := i.Next(ctx)
is.NoErr(err)

p, err := position.ParseSDKPosition(r.Position)
is.NoErr(err)

lsn, err := p.LSN()
is.NoErr(err)

writeLSN, flushLSN, err := fetchSlotStats(t, pool, table) // table is the slot name
is.NoErr(err)

is.Equal(lsn, writeLSN)
is.True(flushLSN < lsn)

is.NoErr(i.Ack(ctx, r.Position))
time.Sleep(2 * time.Second) // wait for at least two status updates

writeLSN, flushLSN, err = fetchSlotStats(t, pool, table) // table is the slot name
is.NoErr(err)

is.True(lsn <= writeLSN)
is.True(lsn <= flushLSN)
is.Equal(writeLSN, flushLSN)
}

func TestCDCIterator_Ack(t *testing.T) {
ctx := context.Background()

Expand Down Expand Up @@ -346,6 +389,8 @@ func testCDCIterator(ctx context.Context, t *testing.T, pool *pgxpool.Pool, tabl
i, err := NewCDCIterator(ctx, &pool.Config().ConnConfig.Config, config)
is.NoErr(err)

i.sub.StatusTimeout = 1 * time.Second

if start {
is.NoErr(i.StartSubscriber(ctx))
}
Expand All @@ -361,3 +406,26 @@ func testCDCIterator(ctx context.Context, t *testing.T, pool *pgxpool.Pool, tabl

return i
}

func fetchSlotStats(t *testing.T, c test.Querier, slotName string) (pglogrepl.LSN, pglogrepl.LSN, error) {
t.Helper()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
defer cancel()

var writeLSN, flushLSN pglogrepl.LSN
for {
query := fmt.Sprintf(`SELECT write_lsn, flush_lsn
FROM pg_stat_replication s JOIN pg_replication_slots rs ON s.pid = rs.active_pid
WHERE rs.slot_name = '%s'`, slotName)

err := c.QueryRow(ctx, query).Scan(&writeLSN, &flushLSN)
if err == nil {
return writeLSN, flushLSN, nil
}
if errors.Is(err, context.DeadlineExceeded) {
return 0, 0, err
}
time.Sleep(100 * time.Millisecond)
}
}
1 change: 0 additions & 1 deletion source/logrepl/combined_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ func TestCombinedIterator_Next(t *testing.T) {

t.Run("next_connector_resume_cdc_6", func(t *testing.T) {
is := is.New(t)

i, err := NewCombinedIterator(ctx, pool, Config{
Position: lastPos,
Tables: []string{table},
Expand Down
33 changes: 20 additions & 13 deletions source/logrepl/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type CDCHandler struct {
tableKeys map[string]string
relationSet *internal.RelationSet
out chan<- sdk.Record
lastTXLSN pglogrepl.LSN
}

func NewCDCHandler(
Expand All @@ -45,35 +46,41 @@ func NewCDCHandler(
}

// Handle is the handler function that receives all logical replication messages.
func (h *CDCHandler) Handle(ctx context.Context, m pglogrepl.Message, lsn pglogrepl.LSN) error {
// Returns non-zero LSN when a record was emitted for the message.
func (h *CDCHandler) Handle(ctx context.Context, m pglogrepl.Message, lsn pglogrepl.LSN) (pglogrepl.LSN, error) {
sdk.Logger(ctx).Trace().
Str("lsn", lsn.String()).
Str("messageType", m.Type().String()).
Msg("handler received pglogrepl.Message")

switch m := m.(type) {
case *pglogrepl.RelationMessage:
// We have to add the Relations to our Set so that we can
// decode our own output
// We have to add the Relations to our Set so that we can decode our own output
h.relationSet.Add(m)
case *pglogrepl.InsertMessage:
err := h.handleInsert(ctx, m, lsn)
if err != nil {
return fmt.Errorf("logrepl handler insert: %w", err)
if err := h.handleInsert(ctx, m, lsn); err != nil {
return 0, fmt.Errorf("logrepl handler insert: %w", err)
}
return lsn, nil
case *pglogrepl.UpdateMessage:
err := h.handleUpdate(ctx, m, lsn)
if err != nil {
return fmt.Errorf("logrepl handler update: %w", err)
if err := h.handleUpdate(ctx, m, lsn); err != nil {
return 0, fmt.Errorf("logrepl handler update: %w", err)
}
return lsn, nil
case *pglogrepl.DeleteMessage:
err := h.handleDelete(ctx, m, lsn)
if err != nil {
return fmt.Errorf("logrepl handler delete: %w", err)
if err := h.handleDelete(ctx, m, lsn); err != nil {
return 0, fmt.Errorf("logrepl handler delete: %w", err)
}
return lsn, nil
case *pglogrepl.BeginMessage:
h.lastTXLSN = m.FinalLSN
case *pglogrepl.CommitMessage:
if h.lastTXLSN != 0 && h.lastTXLSN != m.CommitLSN {
return 0, fmt.Errorf("out of order commit %s, expected %s", m.CommitLSN, h.lastTXLSN)
}
}

return nil
return 0, nil
}

// handleInsert formats a Record with INSERT event data from Postgres and sends
Expand Down
110 changes: 78 additions & 32 deletions source/logrepl/internal/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ const (

// Subscription manages a subscription to a logical replication slot.
type Subscription struct {
SlotName string
Publication string
Tables []string
StartLSN pglogrepl.LSN
Handler Handler
StatusTimeout time.Duration
TXSnapshotID string
SlotName string
Publication string
Tables []string
StartLSN pglogrepl.LSN
ConsistentSlotLSN pglogrepl.LSN
Handler Handler
StatusTimeout time.Duration
TXSnapshotID string

conn *pgconn.PgConn

Expand All @@ -50,11 +51,12 @@ type Subscription struct {
done chan struct{}
doneErr error

walWritten pglogrepl.LSN
walFlushed pglogrepl.LSN
walWritten pglogrepl.LSN
walFlushed pglogrepl.LSN
serverWALEnd pglogrepl.LSN
}

type Handler func(context.Context, pglogrepl.Message, pglogrepl.LSN) error
type Handler func(context.Context, pglogrepl.Message, pglogrepl.LSN) (pglogrepl.LSN, error)

// CreateSubscription initializes the logical replication subscriber by creating the replication slot.
func CreateSubscription(
Expand All @@ -66,6 +68,8 @@ func CreateSubscription(
startLSN pglogrepl.LSN,
h Handler,
) (*Subscription, error) {
var slotLSN pglogrepl.LSN

result, err := pglogrepl.CreateReplicationSlot(
ctx,
conn,
Expand All @@ -87,14 +91,26 @@ func CreateSubscription(
Msgf("replication slot %q already exists", slotName)
}

// If a ConsistentPoint is available use it, otherwise fall back to the startLSN
slotLSN, err = pglogrepl.ParseLSN(result.ConsistentPoint)
if err != nil {
if startLSN == 0 {
sdk.Logger(ctx).Warn().
Msg("start LSN is zero, using existing replication slot without position")
}

slotLSN = startLSN
}

return &Subscription{
SlotName: slotName,
Publication: publication,
Tables: tables,
StartLSN: startLSN,
Handler: h,
StatusTimeout: 10 * time.Second,
TXSnapshotID: result.SnapshotName,
SlotName: slotName,
Publication: publication,
Tables: tables,
StartLSN: startLSN,
ConsistentSlotLSN: slotLSN,
Handler: h,
StatusTimeout: 10 * time.Second,
TXSnapshotID: result.SnapshotName,

conn: conn,

Expand Down Expand Up @@ -158,13 +174,11 @@ func (s *Subscription) listen(ctx context.Context) error {

switch copyDataMsg.Data[0] {
case pglogrepl.PrimaryKeepaliveMessageByteID:
err := s.handlePrimaryKeepaliveMessage(ctx, copyDataMsg)
if err != nil {
if err := s.handlePrimaryKeepaliveMessage(ctx, copyDataMsg); err != nil {
return err
}
case pglogrepl.XLogDataByteID:
err := s.handleXLogData(ctx, copyDataMsg)
if err != nil {
if err := s.handleXLogData(ctx, copyDataMsg); err != nil {
return err
}
default:
Expand All @@ -184,11 +198,15 @@ func (s *Subscription) handlePrimaryKeepaliveMessage(ctx context.Context, copyDa
if err != nil {
return fmt.Errorf("failed to parse primary keepalive message: %w", err)
}

atomic.StoreUint64((*uint64)(&s.serverWALEnd), uint64(pkm.ServerWALEnd))
lyuboxa marked this conversation as resolved.
Show resolved Hide resolved

if pkm.ReplyRequested {
if err = s.sendStandbyStatusUpdate(ctx); err != nil {
if err := s.sendStandbyStatusUpdate(ctx); err != nil {
return fmt.Errorf("failed to send status: %w", err)
}
}

return nil
}

Expand All @@ -210,13 +228,21 @@ func (s *Subscription) handleXLogData(ctx context.Context, copyDataMsg *pgproto3
return fmt.Errorf("invalid message: %w", err)
}

if err = s.Handler(ctx, logicalMsg, xld.WALStart); err != nil {
writtenLSN, err := s.Handler(ctx, logicalMsg, xld.WALStart)
if err != nil {
return fmt.Errorf("handler error: %w", err)
}

if xld.WALStart > 0 {
s.walWritten = xld.WALStart
// When starting on an existing slot without having a consistent slot LSN,
// set the flushed WAL LSN to just before the received LSN
if s.walWritten == 0 && s.ConsistentSlotLSN == 0 {
atomic.StoreUint64((*uint64)(&s.walFlushed), uint64(writtenLSN-1))
}

if writtenLSN > 0 {
s.walWritten = writtenLSN
}

return nil
}

Expand Down Expand Up @@ -315,20 +341,40 @@ func (s *Subscription) sendStandbyStatusUpdate(ctx context.Context) error {
return fmt.Errorf("walWrite (%s) should be >= walFlush (%s)", s.walWritten, walFlushed)
}

// N.B. Manage replication slot lag, by responding with the last server LSN, when
// all previous slot relevant msgs have been written and flushed
replyWithWALEnd := walFlushed == s.walWritten && walFlushed < s.serverWALEnd

sdk.Logger(ctx).Trace().
Str("walWrite", s.walWritten.String()).
Str("walFlush", walFlushed.String()).
Str("walApply", walFlushed.String()).
Stringer("wal_write", s.walWritten).
Stringer("wal_flush", walFlushed).
Stringer("server_wal_end", s.serverWALEnd).
Bool("server_wal_end_sent", replyWithWALEnd).
Msg("sending standby status update")

err := pglogrepl.SendStandbyStatusUpdate(ctx, s.conn, pglogrepl.StandbyStatusUpdate{
if replyWithWALEnd {
if err := pglogrepl.SendStandbyStatusUpdate(ctx, s.conn, pglogrepl.StandbyStatusUpdate{
lyuboxa marked this conversation as resolved.
Show resolved Hide resolved
WALWritePosition: s.serverWALEnd,
}); err != nil {
return fmt.Errorf("failed to send standby status update with server end lsn: %w", err)
}

return nil
}

// No messages have been flushed, in this case set the flushed
lyuboxa marked this conversation as resolved.
Show resolved Hide resolved
// to just before the slot consistent LSN. This prevents flushed
// to be automatically assigned the written LSN by the pglogrepl pkg
if walFlushed == 0 {
walFlushed = s.ConsistentSlotLSN - 1
}

if err := pglogrepl.SendStandbyStatusUpdate(ctx, s.conn, pglogrepl.StandbyStatusUpdate{
WALWritePosition: s.walWritten,
WALFlushPosition: walFlushed,
WALApplyPosition: walFlushed,
ClientTime: time.Now(),
ReplyRequested: false,
})
if err != nil {
}); err != nil {
return fmt.Errorf("failed to send standby status update: %w", err)
}

Expand Down
Loading