diff --git a/enterprise/reporting/flusher/flusher.go b/enterprise/reporting/flusher/flusher.go index 7fff34ee552..d570e0d13dc 100644 --- a/enterprise/reporting/flusher/flusher.go +++ b/enterprise/reporting/flusher/flusher.go @@ -10,6 +10,8 @@ import ( "net/http" "time" + "github.com/rudderlabs/rudder-go-kit/bytesize" + "github.com/cenkalti/backoff" "golang.org/x/sync/errgroup" @@ -40,6 +42,7 @@ type Flusher struct { batchSizeFromDB config.ValueLoader[int] aggressiveFlushEnabled config.ValueLoader[bool] lagThresholdForAggresiveFlushInMins config.ValueLoader[time.Duration] + vacuumThresholdBytes config.ValueLoader[int64] reportingURL string minConcurrentRequests config.ValueLoader[int] @@ -51,6 +54,7 @@ type Flusher struct { aggReportsTimer stats.Measurement sendReportsTimer stats.Measurement deleteReportsTimer stats.Measurement + vacuumReportsTimer stats.Measurement concurrentRequests stats.Measurement reqLatency stats.Measurement reqCount stats.Measurement @@ -71,6 +75,7 @@ func NewFlusher(db *sql.DB, log logger.Logger, stats stats.Stats, conf *config.C batchSizeToReporting := conf.GetReloadableIntVar(10, 1, "Reporting.flusher.batchSizeToReporting") aggressiveFlushEnabled := conf.GetReloadableBoolVar(false, "Reporting.flusher.aggressiveFlushEnabled") lagThresholdForAggresiveFlushInMins := conf.GetReloadableDurationVar(5, time.Minute, "Reporting.flusher.lagThresholdForAggresiveFlushInMins") + vacuumThresholdBytes := conf.GetReloadableInt64Var(5*bytesize.GB, 1, "Reporting.flusher.vacuumThresholdBytes") tr := &http.Transport{} client := &http.Client{Transport: tr, Timeout: config.GetDuration("HttpClient.reporting.timeout", 60, time.Second)} @@ -87,6 +92,7 @@ func NewFlusher(db *sql.DB, log logger.Logger, stats stats.Stats, conf *config.C maxConcurrentRequests: maxConcReqs, stats: stats, batchSizeFromDB: batchSizeFromDB, + vacuumThresholdBytes: vacuumThresholdBytes, table: table, aggregator: aggregator, batchSizeToReporting: batchSizeToReporting, @@ -125,6 +131,7 @@ func (f *Flusher) initStats(tags map[string]string) { f.sendReportsTimer = f.stats.NewTaggedStat("reporting_flusher_send_reports_duration_seconds", stats.TimerType, tags) f.deleteReportsTimer = f.stats.NewTaggedStat("reporting_flusher_delete_reports_duration_seconds", stats.TimerType, tags) + f.vacuumReportsTimer = f.stats.NewTaggedStat("reporting_flusher_vacuum_reports_duration_seconds", stats.TimerType, tags) f.concurrentRequests = f.stats.NewTaggedStat("reporting_flusher_concurrent_requests_in_progress", stats.GaugeType, tags) f.reqLatency = f.stats.NewTaggedStat("reporting_flusher_http_request_duration_seconds", stats.TimerType, tags) @@ -217,6 +224,11 @@ func (f *Flusher) Flush(ctx context.Context) error { } f.deleteReportsTimer.Since(s) + // 5. Vacuum the table + if err := f.vacuum(ctx); err != nil { + return err + } + return nil } @@ -303,6 +315,23 @@ func (f *Flusher) delete(ctx context.Context, minReportedAt, maxReportedAt time. return err } +func (f *Flusher) vacuum(ctx context.Context) error { + vacuumStart := time.Now() + var sizeEstimate int64 + if err := f.db.QueryRowContext( + ctx, fmt.Sprintf(`SELECT pg_table_size(oid) from pg_class where relname='%s'`, f.table), + ).Scan(&sizeEstimate); err != nil { + return fmt.Errorf("error getting table size %w", err) + } + if sizeEstimate > f.vacuumThresholdBytes.Load() { + if _, err := f.db.ExecContext(ctx, fmt.Sprintf("vacuum full analyse %s", f.table)); err != nil { + return fmt.Errorf("error vacuuming table %w", err) + } + f.vacuumReportsTimer.Since(vacuumStart) + } + return nil +} + func (f *Flusher) makePOSTRequest(ctx context.Context, url string, payload interface{}) error { payloadBytes, err := json.Marshal(payload) if err != nil { diff --git a/enterprise/reporting/flusher/tracked_users_test.go b/enterprise/reporting/flusher/tracked_users_test.go index 15b4a9cc3bd..ecfc7ca7929 100644 --- a/enterprise/reporting/flusher/tracked_users_test.go +++ b/enterprise/reporting/flusher/tracked_users_test.go @@ -12,6 +12,8 @@ import ( "testing" "time" + "github.com/rudderlabs/rudder-go-kit/bytesize" + "github.com/ory/dockertest/v3" "github.com/segmentio/go-hll" "github.com/spaolacci/murmur3" @@ -150,12 +152,14 @@ func TestTrackedUsersFlush(t *testing.T) { require.NoError(t, err) err = migrateDatabase(pgContainer.DBDsn, config.Default) require.NoError(t, err) - config.Set("TrackedUsers.enabled", true) - config.Set("DB.port", pgContainer.Port) - config.Set("DB.user", pgContainer.User) - config.Set("DB.name", pgContainer.Database) - config.Set("DB.password", pgContainer.Password) - config.Set("Reporting.flusher.batchSizeToReporting", 2) + cfg := config.Default + cfg.Set("Reporting.flusher.vacuumThresholdBytes", 4*bytesize.B) + cfg.Set("TrackedUsers.enabled", true) + cfg.Set("DB.port", pgContainer.Port) + cfg.Set("DB.user", pgContainer.User) + cfg.Set("DB.name", pgContainer.Database) + cfg.Set("DB.password", pgContainer.Password) + cfg.Set("Reporting.flusher.batchSizeToReporting", 2) err = addReportsToDB(ctx, pgContainer.DB, reports) assert.NoError(t, err) @@ -170,10 +174,10 @@ func TestTrackedUsersFlush(t *testing.T) { testWebhook := webhook.NewRecorder() t.Cleanup(testWebhook.Close) webhookURL := testWebhook.Server.URL - config.Set("REPORTING_URL", webhookURL) + cfg.Set("REPORTING_URL", webhookURL) // create runner and start it - runner, err := flusher.CreateRunner(ctx, "tracked_users_reports", logger.NOP, stats.NOP, config.Default, "test") + runner, err := flusher.CreateRunner(ctx, "tracked_users_reports", logger.NOP, stats.NOP, cfg, "test") require.NoError(t, err) runnerDone := make(chan struct{}) @@ -185,10 +189,18 @@ func TestTrackedUsersFlush(t *testing.T) { // Wait till all the reports which were added tenMinAgo to be flushed require.Eventually(t, func() bool { var count int - _ = pgContainer.DB.QueryRow("SELECT COUNT(*) FROM tracked_users_reports").Scan(&count) + _ = pgContainer.DB.QueryRowContext(ctx, "SELECT COUNT(*) FROM tracked_users_reports").Scan(&count) return count == 1 }, time.Second*30, time.Millisecond*1000) + // number of dead tuples always be 0 as we have set a very low threshold for Reporting.flusher.vacuumThresholdBytes + require.Never(t, func() bool { + var deadTuples int + _ = pgContainer.DB.QueryRowContext(ctx, "select n_dead_tup from pg_stat_all_tables where relname='tracked_users_reports'").Scan(&deadTuples) + fmt.Println(deadTuples) + return deadTuples > 0 + }, 60*time.Second, time.Second) + runner.Stop() // validate we get 2 requests, we have batch size of 2