From ce5a009408d2108dc743fd9a27d155438448907a Mon Sep 17 00:00:00 2001 From: Mihir Gandhi Date: Mon, 12 Aug 2024 16:17:18 +0530 Subject: [PATCH] chore: vacuum tracked_users_reports table (#4948) * chore: vacuum tracked_users_reports table * addressed comments --- enterprise/reporting/flusher/flusher.go | 31 +++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/enterprise/reporting/flusher/flusher.go b/enterprise/reporting/flusher/flusher.go index 7fff34ee55..16a8d8e78a 100644 --- a/enterprise/reporting/flusher/flusher.go +++ b/enterprise/reporting/flusher/flusher.go @@ -10,6 +10,10 @@ import ( "net/http" "time" + "github.com/lib/pq" + + "github.com/rudderlabs/rudder-go-kit/bytesize" + "github.com/cenkalti/backoff" "golang.org/x/sync/errgroup" @@ -40,6 +44,7 @@ type Flusher struct { batchSizeFromDB config.ValueLoader[int] aggressiveFlushEnabled config.ValueLoader[bool] lagThresholdForAggresiveFlushInMins config.ValueLoader[time.Duration] + vacuumThresholdBytes config.ValueLoader[int64] reportingURL string minConcurrentRequests config.ValueLoader[int] @@ -51,6 +56,7 @@ type Flusher struct { aggReportsTimer stats.Measurement sendReportsTimer stats.Measurement deleteReportsTimer stats.Measurement + vacuumReportsTimer stats.Measurement concurrentRequests stats.Measurement reqLatency stats.Measurement reqCount stats.Measurement @@ -71,6 +77,7 @@ func NewFlusher(db *sql.DB, log logger.Logger, stats stats.Stats, conf *config.C batchSizeToReporting := conf.GetReloadableIntVar(10, 1, "Reporting.flusher.batchSizeToReporting") aggressiveFlushEnabled := conf.GetReloadableBoolVar(false, "Reporting.flusher.aggressiveFlushEnabled") lagThresholdForAggresiveFlushInMins := conf.GetReloadableDurationVar(5, time.Minute, "Reporting.flusher.lagThresholdForAggresiveFlushInMins") + vacuumThresholdBytes := conf.GetReloadableInt64Var(5*bytesize.GB, 1, "Reporting.flusher.vacuumThresholdBytes") tr := &http.Transport{} client := &http.Client{Transport: tr, Timeout: config.GetDuration("HttpClient.reporting.timeout", 60, time.Second)} @@ -87,6 +94,7 @@ func NewFlusher(db *sql.DB, log logger.Logger, stats stats.Stats, conf *config.C maxConcurrentRequests: maxConcReqs, stats: stats, batchSizeFromDB: batchSizeFromDB, + vacuumThresholdBytes: vacuumThresholdBytes, table: table, aggregator: aggregator, batchSizeToReporting: batchSizeToReporting, @@ -125,6 +133,7 @@ func (f *Flusher) initStats(tags map[string]string) { f.sendReportsTimer = f.stats.NewTaggedStat("reporting_flusher_send_reports_duration_seconds", stats.TimerType, tags) f.deleteReportsTimer = f.stats.NewTaggedStat("reporting_flusher_delete_reports_duration_seconds", stats.TimerType, tags) + f.vacuumReportsTimer = f.stats.NewTaggedStat("reporting_flusher_vacuum_reports_duration_seconds", stats.TimerType, tags) f.concurrentRequests = f.stats.NewTaggedStat("reporting_flusher_concurrent_requests_in_progress", stats.GaugeType, tags) f.reqLatency = f.stats.NewTaggedStat("reporting_flusher_http_request_duration_seconds", stats.TimerType, tags) @@ -217,6 +226,11 @@ func (f *Flusher) Flush(ctx context.Context) error { } f.deleteReportsTimer.Since(s) + // 5. Vacuum the table + if err := f.vacuum(ctx); err != nil { + return err + } + return nil } @@ -303,6 +317,23 @@ func (f *Flusher) delete(ctx context.Context, minReportedAt, maxReportedAt time. return err } +func (f *Flusher) vacuum(ctx context.Context) error { + var sizeEstimate int64 + if err := f.db.QueryRowContext( + ctx, `SELECT pg_table_size(oid) from pg_class where relname = $1`, f.table, + ).Scan(&sizeEstimate); err != nil { + return fmt.Errorf("error getting table size %w", err) + } + if sizeEstimate > f.vacuumThresholdBytes.Load() { + vacuumStart := time.Now() + if _, err := f.db.ExecContext(ctx, fmt.Sprintf("vacuum full analyze %s", pq.QuoteIdentifier(f.table))); err != nil { + return fmt.Errorf("error vacuuming table %w", err) + } + f.vacuumReportsTimer.Since(vacuumStart) + } + return nil +} + func (f *Flusher) makePOSTRequest(ctx context.Context, url string, payload interface{}) error { payloadBytes, err := json.Marshal(payload) if err != nil {