Skip to content

Commit

Permalink
add extra log metrics for snapshot progress (#162)
Browse files Browse the repository at this point in the history
* Add extra log metrics for logrepl
  • Loading branch information
lyuboxa authored Jun 13, 2024
1 parent 9b84f73 commit 66d545c
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
2 changes: 1 addition & 1 deletion source/logrepl/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (i *CDCIterator) StartSubscriber(ctx context.Context) error {
sdk.Logger(ctx).Info().
Str("slot", i.config.SlotName).
Str("publication", i.config.PublicationName).
Msg("Starting logical replication")
Msgf("Starting logical replication at %s", i.sub.StartLSN)

go func() {
if err := i.sub.Run(ctx); err != nil {
Expand Down
16 changes: 16 additions & 0 deletions source/snapshot/fetch_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,14 @@ func (f *FetchWorker) Run(ctx context.Context) error {
Int("rows", nfetched).
Str("table", f.conf.Table).
Dur("elapsed", time.Since(start)).
Str("completed_perc", fmt.Sprintf("%.2f", (float64(nfetched)/float64(f.snapshotEnd))*100)).
Str("rate_per_min", fmt.Sprintf("%.0f", float64(nfetched)/time.Since(start).Minutes())).
Msg("fetching rows")
}

sdk.Logger(ctx).Info().
Dur("elapsed", time.Since(start)).
Str("rate_per_min", fmt.Sprintf("%.0f", float64(nfetched)/time.Since(start).Minutes())).
Str("table", f.conf.Table).
Msgf("%q snapshot completed", f.conf.Table)

Expand Down Expand Up @@ -254,12 +257,18 @@ func (f *FetchWorker) updateSnapshotEnd(ctx context.Context, tx pgx.Tx) error {
}

func (f *FetchWorker) fetch(ctx context.Context, tx pgx.Tx) (int, error) {
start := time.Now().UTC()

rows, err := tx.Query(ctx, fmt.Sprintf("FETCH %d FROM %s", f.conf.FetchSize, f.cursorName))
if err != nil {
return 0, fmt.Errorf("failed to fetch rows: %w", err)
}
defer rows.Close()

sdk.Logger(ctx).Info().
Dur("fetch_elapsed", time.Since(start)).
Msg("cursor fetched data")

var fields []string
for _, f := range rows.FieldDescriptions() {
fields = append(fields, f.Name)
Expand Down Expand Up @@ -292,6 +301,13 @@ func (f *FetchWorker) fetch(ctx context.Context, tx pgx.Tx) (int, error) {
}

func (f *FetchWorker) send(ctx context.Context, d FetchData) error {
start := time.Now().UTC()
defer func() {
sdk.Logger(ctx).Trace().
Dur("send_elapsed", time.Since(start)).
Msg("sending data to chan")
}()

select {
case <-ctx.Done():
return ctx.Err()
Expand Down

0 comments on commit 66d545c

Please sign in to comment.