Skip to content

Commit

Permalink
chore: vacuum tracked_users_reports table (#4948)
Browse files Browse the repository at this point in the history
* chore: vacuum tracked_users_reports table

* addressed comments
  • Loading branch information
mihir20 authored Aug 12, 2024
1 parent 2572016 commit ce5a009
Showing 1 changed file with 31 additions and 0 deletions.
31 changes: 31 additions & 0 deletions enterprise/reporting/flusher/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
"net/http"
"time"

"github.com/lib/pq"

"github.com/rudderlabs/rudder-go-kit/bytesize"

"github.com/cenkalti/backoff"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -40,6 +44,7 @@ type Flusher struct {
batchSizeFromDB config.ValueLoader[int]
aggressiveFlushEnabled config.ValueLoader[bool]
lagThresholdForAggresiveFlushInMins config.ValueLoader[time.Duration]
vacuumThresholdBytes config.ValueLoader[int64]

reportingURL string
minConcurrentRequests config.ValueLoader[int]
Expand All @@ -51,6 +56,7 @@ type Flusher struct {
aggReportsTimer stats.Measurement
sendReportsTimer stats.Measurement
deleteReportsTimer stats.Measurement
vacuumReportsTimer stats.Measurement
concurrentRequests stats.Measurement
reqLatency stats.Measurement
reqCount stats.Measurement
Expand All @@ -71,6 +77,7 @@ func NewFlusher(db *sql.DB, log logger.Logger, stats stats.Stats, conf *config.C
batchSizeToReporting := conf.GetReloadableIntVar(10, 1, "Reporting.flusher.batchSizeToReporting")
aggressiveFlushEnabled := conf.GetReloadableBoolVar(false, "Reporting.flusher.aggressiveFlushEnabled")
lagThresholdForAggresiveFlushInMins := conf.GetReloadableDurationVar(5, time.Minute, "Reporting.flusher.lagThresholdForAggresiveFlushInMins")
vacuumThresholdBytes := conf.GetReloadableInt64Var(5*bytesize.GB, 1, "Reporting.flusher.vacuumThresholdBytes")

tr := &http.Transport{}
client := &http.Client{Transport: tr, Timeout: config.GetDuration("HttpClient.reporting.timeout", 60, time.Second)}
Expand All @@ -87,6 +94,7 @@ func NewFlusher(db *sql.DB, log logger.Logger, stats stats.Stats, conf *config.C
maxConcurrentRequests: maxConcReqs,
stats: stats,
batchSizeFromDB: batchSizeFromDB,
vacuumThresholdBytes: vacuumThresholdBytes,
table: table,
aggregator: aggregator,
batchSizeToReporting: batchSizeToReporting,
Expand Down Expand Up @@ -125,6 +133,7 @@ func (f *Flusher) initStats(tags map[string]string) {

f.sendReportsTimer = f.stats.NewTaggedStat("reporting_flusher_send_reports_duration_seconds", stats.TimerType, tags)
f.deleteReportsTimer = f.stats.NewTaggedStat("reporting_flusher_delete_reports_duration_seconds", stats.TimerType, tags)
f.vacuumReportsTimer = f.stats.NewTaggedStat("reporting_flusher_vacuum_reports_duration_seconds", stats.TimerType, tags)

f.concurrentRequests = f.stats.NewTaggedStat("reporting_flusher_concurrent_requests_in_progress", stats.GaugeType, tags)
f.reqLatency = f.stats.NewTaggedStat("reporting_flusher_http_request_duration_seconds", stats.TimerType, tags)
Expand Down Expand Up @@ -217,6 +226,11 @@ func (f *Flusher) Flush(ctx context.Context) error {
}
f.deleteReportsTimer.Since(s)

// 5. Vacuum the table
if err := f.vacuum(ctx); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -303,6 +317,23 @@ func (f *Flusher) delete(ctx context.Context, minReportedAt, maxReportedAt time.
return err
}

func (f *Flusher) vacuum(ctx context.Context) error {
var sizeEstimate int64
if err := f.db.QueryRowContext(
ctx, `SELECT pg_table_size(oid) from pg_class where relname = $1`, f.table,
).Scan(&sizeEstimate); err != nil {
return fmt.Errorf("error getting table size %w", err)
}
if sizeEstimate > f.vacuumThresholdBytes.Load() {
vacuumStart := time.Now()
if _, err := f.db.ExecContext(ctx, fmt.Sprintf("vacuum full analyze %s", pq.QuoteIdentifier(f.table))); err != nil {
return fmt.Errorf("error vacuuming table %w", err)
}
f.vacuumReportsTimer.Since(vacuumStart)
}
return nil
}

func (f *Flusher) makePOSTRequest(ctx context.Context, url string, payload interface{}) error {
payloadBytes, err := json.Marshal(payload)
if err != nil {
Expand Down

0 comments on commit ce5a009

Please sign in to comment.