Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: vacuum tracked_users_reports table #4948

Merged
merged 2 commits into from
Aug 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions enterprise/reporting/flusher/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
"net/http"
"time"

"github.com/lib/pq"

"github.com/rudderlabs/rudder-go-kit/bytesize"

"github.com/cenkalti/backoff"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -40,6 +44,7 @@
batchSizeFromDB config.ValueLoader[int]
aggressiveFlushEnabled config.ValueLoader[bool]
lagThresholdForAggresiveFlushInMins config.ValueLoader[time.Duration]
vacuumThresholdBytes config.ValueLoader[int64]

reportingURL string
minConcurrentRequests config.ValueLoader[int]
Expand All @@ -51,6 +56,7 @@
aggReportsTimer stats.Measurement
sendReportsTimer stats.Measurement
deleteReportsTimer stats.Measurement
vacuumReportsTimer stats.Measurement
concurrentRequests stats.Measurement
reqLatency stats.Measurement
reqCount stats.Measurement
Expand All @@ -71,6 +77,7 @@
batchSizeToReporting := conf.GetReloadableIntVar(10, 1, "Reporting.flusher.batchSizeToReporting")
aggressiveFlushEnabled := conf.GetReloadableBoolVar(false, "Reporting.flusher.aggressiveFlushEnabled")
lagThresholdForAggresiveFlushInMins := conf.GetReloadableDurationVar(5, time.Minute, "Reporting.flusher.lagThresholdForAggresiveFlushInMins")
vacuumThresholdBytes := conf.GetReloadableInt64Var(5*bytesize.GB, 1, "Reporting.flusher.vacuumThresholdBytes")

tr := &http.Transport{}
client := &http.Client{Transport: tr, Timeout: config.GetDuration("HttpClient.reporting.timeout", 60, time.Second)}
Expand All @@ -87,6 +94,7 @@
maxConcurrentRequests: maxConcReqs,
stats: stats,
batchSizeFromDB: batchSizeFromDB,
vacuumThresholdBytes: vacuumThresholdBytes,
table: table,
aggregator: aggregator,
batchSizeToReporting: batchSizeToReporting,
Expand Down Expand Up @@ -125,6 +133,7 @@

f.sendReportsTimer = f.stats.NewTaggedStat("reporting_flusher_send_reports_duration_seconds", stats.TimerType, tags)
f.deleteReportsTimer = f.stats.NewTaggedStat("reporting_flusher_delete_reports_duration_seconds", stats.TimerType, tags)
f.vacuumReportsTimer = f.stats.NewTaggedStat("reporting_flusher_vacuum_reports_duration_seconds", stats.TimerType, tags)

f.concurrentRequests = f.stats.NewTaggedStat("reporting_flusher_concurrent_requests_in_progress", stats.GaugeType, tags)
f.reqLatency = f.stats.NewTaggedStat("reporting_flusher_http_request_duration_seconds", stats.TimerType, tags)
Expand Down Expand Up @@ -217,6 +226,11 @@
}
f.deleteReportsTimer.Since(s)

// 5. Vacuum the table
if err := f.vacuum(ctx); err != nil {
return err

Check warning on line 231 in enterprise/reporting/flusher/flusher.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/flusher.go#L231

Added line #L231 was not covered by tests
}

return nil
}

Expand Down Expand Up @@ -303,6 +317,23 @@
return err
}

func (f *Flusher) vacuum(ctx context.Context) error {
var sizeEstimate int64
if err := f.db.QueryRowContext(
ctx, `SELECT pg_table_size(oid) from pg_class where relname = $1`, f.table,
).Scan(&sizeEstimate); err != nil {
return fmt.Errorf("error getting table size %w", err)

Check warning on line 325 in enterprise/reporting/flusher/flusher.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/flusher.go#L325

Added line #L325 was not covered by tests
}
if sizeEstimate > f.vacuumThresholdBytes.Load() {
vacuumStart := time.Now()
if _, err := f.db.ExecContext(ctx, fmt.Sprintf("vacuum full analyze %s", pq.QuoteIdentifier(f.table))); err != nil {
return fmt.Errorf("error vacuuming table %w", err)

Check warning on line 330 in enterprise/reporting/flusher/flusher.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/flusher.go#L328-L330

Added lines #L328 - L330 were not covered by tests
}
f.vacuumReportsTimer.Since(vacuumStart)

Check warning on line 332 in enterprise/reporting/flusher/flusher.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/flusher/flusher.go#L332

Added line #L332 was not covered by tests
}
return nil
}

func (f *Flusher) makePOSTRequest(ctx context.Context, url string, payload interface{}) error {
payloadBytes, err := json.Marshal(payload)
if err != nil {
Expand Down