Skip to content

Commit

Permalink
Send heartbeats immedately when pulling messages
Browse files Browse the repository at this point in the history
This ensures there's some DB activity immediately after connecting to
stream WAL changes.  For some hosts, this is required.
  • Loading branch information
tonyhb committed Oct 16, 2024
1 parent 8fe5f77 commit c0d1b53
Showing 1 changed file with 52 additions and 20 deletions.
72 changes: 52 additions & 20 deletions pkg/replicator/pgreplicator/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func New(ctx context.Context, opts Opts) (PostgresReplicator, error) {
if opts.Log == nil {
opts.Log = slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
})).With("host", opts.Config.Host)
}

cfg := opts.Config
Expand Down Expand Up @@ -256,6 +256,8 @@ func (p *pg) Pull(ctx context.Context, cc chan *changeset.Changeset) error {
return err
}

p.log.Debug("connected to replication slot")

// Postgres batches every individual insert, update, etc. within a BEGIN/COMMIT message.
// This is great for replication. However, for Inngest events, we don't want superflous begin
// or commit messages as events.
Expand All @@ -268,64 +270,91 @@ func (p *pg) Pull(ctx context.Context, cc chan *changeset.Changeset) error {
go func() {
if p.version < pgconsts.MessagesVersion {
// doesn't support wal messages; ignore.
p.log.Debug("heartbeat not supported", "pg_version", p.version)
return
}

// Send a hearbeat immediately.
if err := p.heartbeat(ctx); err != nil {
p.log.Warn("unable to emit immediate heartbeat", "error", err)
}

t := time.NewTicker(p.heartbeatTime)
for range t.C {
if p.queryConn.IsClosed() {
if err := p.createQueryConn(ctx); err != nil {
p.log.Error("error reconnecting for heartbeat", "error", err, "host", p.opts.Config.Host)
doneCheck := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
return
case <-doneCheck.C:
// Check for the stopped signal internally every second. This lets us log
// the stopped message relatively close to the stop signal occurring.
if atomic.LoadInt32(&p.stopped) == 1 {
p.log.Debug("stopping heartbeat", "ctx_err", ctx.Err(), "stopped", atomic.LoadInt32(&p.stopped))
return
}
}

if atomic.LoadInt32(&p.stopped) == 1 || ctx.Err() != nil {
return
}

// Send a hearbeat every minute
p.queryLock.Lock()
_, err := p.queryConn.Exec(ctx, "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar);")
p.queryLock.Unlock()

if isConnClosedErr(err) && p.queryConn.IsClosed() {
continue
}
case <-t.C:
if p.queryConn.IsClosed() {
if err := p.createQueryConn(ctx); err != nil {
p.log.Error("error reconnecting for heartbeat", "error", err)
return
}
}

// Send a hearbeat every minute
err := p.heartbeat(ctx)
if err != nil {
p.log.Warn("unable to emit heartbeat", "error", err)
continue
}

if err != nil {
p.log.Warn("unable to emit heartbeat", "error", err, "host", p.opts.Config.Host)
p.log.Debug("sent heartbeat", "error", err)
}
}
}()

for {
if ctx.Err() != nil || atomic.LoadInt32(&p.stopped) == 1 || p.conn.IsClosed() {
// Always call Close automatically.
p.log.Debug("stopping cdc connection", "conn_closed", p.conn.IsClosed())
p.Close(ctx)
return nil
}

changes, err := p.fetch(ctx)
if err != nil {
p.log.Warn("error pulling messages", "error", err)
return err
}
if changes == nil {
p.log.Debug("no messages pulled")
continue
}

if changes.Operation == changeset.OperationHeartbeat {
p.log.Debug("heartbeat pulled")
p.Commit(changes.Watermark)
if err := p.forceNextReport(ctx); err != nil {
p.log.Warn("unable to report lsn on heartbeat", "error", err, "host", p.opts.Config.Host)
}
continue
}

p.log.Debug("message pulled", "op", changes.Operation)

unwrapper.Process(changes)
}
}

func (p *pg) heartbeat(ctx context.Context) error {
// Send a hearbeat every minute
p.queryLock.Lock()
_, err := p.queryConn.Exec(ctx, "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar);")
p.queryLock.Unlock()
return err

}

func (p *pg) fetch(ctx context.Context) (*changeset.Changeset, error) {
var err error

Expand Down Expand Up @@ -457,6 +486,9 @@ func (p *pg) report(ctx context.Context, forceReply bool) error {
if lsn == 0 {
return nil
}

p.log.Debug("reporting lsn to source", "lsn", p.LSN().String())

err := pglogrepl.SendStandbyStatusUpdate(ctx,
p.conn.PgConn(),
pglogrepl.StandbyStatusUpdate{
Expand Down

0 comments on commit c0d1b53

Please sign in to comment.