Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send heartbeats immedately when pulling messages #5

Merged
merged 1 commit into from
Oct 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 53 additions & 21 deletions pkg/replicator/pgreplicator/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func New(ctx context.Context, opts Opts) (PostgresReplicator, error) {
if opts.Log == nil {
opts.Log = slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
})).With("host", opts.Config.Host)
}

cfg := opts.Config
Expand Down Expand Up @@ -256,6 +256,8 @@ func (p *pg) Pull(ctx context.Context, cc chan *changeset.Changeset) error {
return err
}

p.log.Debug("connected to replication slot")

// Postgres batches every individual insert, update, etc. within a BEGIN/COMMIT message.
// This is great for replication. However, for Inngest events, we don't want superflous begin
// or commit messages as events.
Expand All @@ -268,64 +270,91 @@ func (p *pg) Pull(ctx context.Context, cc chan *changeset.Changeset) error {
go func() {
if p.version < pgconsts.MessagesVersion {
// doesn't support wal messages; ignore.
p.log.Debug("heartbeat not supported", "pg_version", p.version)
return
}

// Send a hearbeat immediately.
if err := p.heartbeat(ctx); err != nil {
p.log.Warn("unable to emit immediate heartbeat", "error", err)
}

t := time.NewTicker(p.heartbeatTime)
for range t.C {
if p.queryConn.IsClosed() {
if err := p.createQueryConn(ctx); err != nil {
p.log.Error("error reconnecting for heartbeat", "error", err, "host", p.opts.Config.Host)
doneCheck := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
return
case <-doneCheck.C:
// Check for the stopped signal internally every second. This lets us log
// the stopped message relatively close to the stop signal occurring.
if atomic.LoadInt32(&p.stopped) == 1 {
p.log.Debug("stopping heartbeat", "ctx_err", ctx.Err(), "stopped", atomic.LoadInt32(&p.stopped))
return
}
}

if atomic.LoadInt32(&p.stopped) == 1 || ctx.Err() != nil {
return
}

// Send a hearbeat every minute
p.queryLock.Lock()
_, err := p.queryConn.Exec(ctx, "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar);")
p.queryLock.Unlock()

if isConnClosedErr(err) && p.queryConn.IsClosed() {
continue
}
case <-t.C:
if p.queryConn.IsClosed() {
if err := p.createQueryConn(ctx); err != nil {
p.log.Error("error reconnecting for heartbeat", "error", err)
return
}
}

// Send a hearbeat every minute
err := p.heartbeat(ctx)
if err != nil {
p.log.Warn("unable to emit heartbeat", "error", err)
continue
}

if err != nil {
p.log.Warn("unable to emit heartbeat", "error", err, "host", p.opts.Config.Host)
p.log.Debug("sent heartbeat", "error", err)
}
}
}()

for {
if ctx.Err() != nil || atomic.LoadInt32(&p.stopped) == 1 || p.conn.IsClosed() {
// Always call Close automatically.
p.log.Debug("stopping cdc connection", "conn_closed", p.conn.IsClosed())
p.Close(ctx)
return nil
}

changes, err := p.fetch(ctx)
if err != nil {
p.log.Warn("error pulling messages", "error", err)
return err
}
if changes == nil {
p.log.Debug("no messages pulled")
continue
}

if changes.Operation == changeset.OperationHeartbeat {
p.log.Debug("heartbeat pulled")
p.Commit(changes.Watermark)
if err := p.forceNextReport(ctx); err != nil {
p.log.Warn("unable to report lsn on heartbeat", "error", err, "host", p.opts.Config.Host)
}
continue
}

p.log.Debug("message pulled", "op", changes.Operation)

unwrapper.Process(changes)
}
}

func (p *pg) heartbeat(ctx context.Context) error {
// Send a hearbeat every minute
p.queryLock.Lock()
_, err := p.queryConn.Exec(ctx, "SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar);")
p.queryLock.Unlock()
return err

}

func (p *pg) fetch(ctx context.Context) (*changeset.Changeset, error) {
var err error

Expand Down Expand Up @@ -457,6 +486,9 @@ func (p *pg) report(ctx context.Context, forceReply bool) error {
if lsn == 0 {
return nil
}

p.log.Debug("reporting lsn to source", "lsn", p.LSN().String())

err := pglogrepl.SendStandbyStatusUpdate(ctx,
p.conn.PgConn(),
pglogrepl.StandbyStatusUpdate{
Expand Down Expand Up @@ -533,6 +565,6 @@ func standardizeErr(err error) (bool, error) {
return false, err
}

func isConnClosedErr(err error) bool {
func IsConnClosedErr(err error) bool {
return err != nil && strings.Contains(err.Error(), "conn closed")
}
Loading