Skip to content

Commit

Permalink
chore: vacuum tracked_users_reports table
Browse files Browse the repository at this point in the history
  • Loading branch information
mihir20 committed Jul 29, 2024
1 parent 2e029f9 commit 9311b89
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 9 deletions.
29 changes: 29 additions & 0 deletions enterprise/reporting/flusher/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"net/http"
"time"

"github.com/rudderlabs/rudder-go-kit/bytesize"

"github.com/cenkalti/backoff"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -40,6 +42,7 @@ type Flusher struct {
batchSizeFromDB config.ValueLoader[int]
aggressiveFlushEnabled config.ValueLoader[bool]
lagThresholdForAggresiveFlushInMins config.ValueLoader[time.Duration]
vacuumThresholdBytes config.ValueLoader[int64]

reportingURL string
minConcurrentRequests config.ValueLoader[int]
Expand All @@ -51,6 +54,7 @@ type Flusher struct {
aggReportsTimer stats.Measurement
sendReportsTimer stats.Measurement
deleteReportsTimer stats.Measurement
vacuumReportsTimer stats.Measurement
concurrentRequests stats.Measurement
reqLatency stats.Measurement
reqCount stats.Measurement
Expand All @@ -71,6 +75,7 @@ func NewFlusher(db *sql.DB, log logger.Logger, stats stats.Stats, conf *config.C
batchSizeToReporting := conf.GetReloadableIntVar(10, 1, "Reporting.flusher.batchSizeToReporting")
aggressiveFlushEnabled := conf.GetReloadableBoolVar(false, "Reporting.flusher.aggressiveFlushEnabled")
lagThresholdForAggresiveFlushInMins := conf.GetReloadableDurationVar(5, time.Minute, "Reporting.flusher.lagThresholdForAggresiveFlushInMins")
vacuumThresholdBytes := conf.GetReloadableInt64Var(5*bytesize.GB, 1, "Reporting.flusher.vacuumThresholdBytes")

tr := &http.Transport{}
client := &http.Client{Transport: tr, Timeout: config.GetDuration("HttpClient.reporting.timeout", 60, time.Second)}
Expand All @@ -87,6 +92,7 @@ func NewFlusher(db *sql.DB, log logger.Logger, stats stats.Stats, conf *config.C
maxConcurrentRequests: maxConcReqs,
stats: stats,
batchSizeFromDB: batchSizeFromDB,
vacuumThresholdBytes: vacuumThresholdBytes,
table: table,
aggregator: aggregator,
batchSizeToReporting: batchSizeToReporting,
Expand Down Expand Up @@ -125,6 +131,7 @@ func (f *Flusher) initStats(tags map[string]string) {

f.sendReportsTimer = f.stats.NewTaggedStat("reporting_flusher_send_reports_duration_seconds", stats.TimerType, tags)
f.deleteReportsTimer = f.stats.NewTaggedStat("reporting_flusher_delete_reports_duration_seconds", stats.TimerType, tags)
f.vacuumReportsTimer = f.stats.NewTaggedStat("reporting_flusher_vacuum_reports_duration_seconds", stats.TimerType, tags)

f.concurrentRequests = f.stats.NewTaggedStat("reporting_flusher_concurrent_requests_in_progress", stats.GaugeType, tags)
f.reqLatency = f.stats.NewTaggedStat("reporting_flusher_http_request_duration_seconds", stats.TimerType, tags)
Expand Down Expand Up @@ -217,6 +224,11 @@ func (f *Flusher) Flush(ctx context.Context) error {
}
f.deleteReportsTimer.Since(s)

// 5. Vacuum the table
if err := f.vacuum(ctx); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -303,6 +315,23 @@ func (f *Flusher) delete(ctx context.Context, minReportedAt, maxReportedAt time.
return err
}

func (f *Flusher) vacuum(ctx context.Context) error {
vacuumStart := time.Now()
var sizeEstimate int64
if err := f.db.QueryRowContext(
ctx, fmt.Sprintf(`SELECT pg_table_size(oid) from pg_class where relname='%s'`, f.table),
).Scan(&sizeEstimate); err != nil {
return fmt.Errorf("error getting table size %w", err)
}
if sizeEstimate > f.vacuumThresholdBytes.Load() {
if _, err := f.db.ExecContext(ctx, fmt.Sprintf("vacuum full analyse %s", f.table)); err != nil {
return fmt.Errorf("error vacuuming table %w", err)
}
f.vacuumReportsTimer.Since(vacuumStart)
}
return nil
}

func (f *Flusher) makePOSTRequest(ctx context.Context, url string, payload interface{}) error {
payloadBytes, err := json.Marshal(payload)
if err != nil {
Expand Down
30 changes: 21 additions & 9 deletions enterprise/reporting/flusher/tracked_users_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-go-kit/bytesize"

"github.com/ory/dockertest/v3"
"github.com/segmentio/go-hll"
"github.com/spaolacci/murmur3"
Expand Down Expand Up @@ -150,12 +152,14 @@ func TestTrackedUsersFlush(t *testing.T) {
require.NoError(t, err)
err = migrateDatabase(pgContainer.DBDsn, config.Default)
require.NoError(t, err)
config.Set("TrackedUsers.enabled", true)
config.Set("DB.port", pgContainer.Port)
config.Set("DB.user", pgContainer.User)
config.Set("DB.name", pgContainer.Database)
config.Set("DB.password", pgContainer.Password)
config.Set("Reporting.flusher.batchSizeToReporting", 2)
cfg := config.Default
cfg.Set("Reporting.flusher.vacuumThresholdBytes", 4*bytesize.B)
cfg.Set("TrackedUsers.enabled", true)
cfg.Set("DB.port", pgContainer.Port)
cfg.Set("DB.user", pgContainer.User)
cfg.Set("DB.name", pgContainer.Database)
cfg.Set("DB.password", pgContainer.Password)
cfg.Set("Reporting.flusher.batchSizeToReporting", 2)

err = addReportsToDB(ctx, pgContainer.DB, reports)
assert.NoError(t, err)
Expand All @@ -170,10 +174,10 @@ func TestTrackedUsersFlush(t *testing.T) {
testWebhook := webhook.NewRecorder()
t.Cleanup(testWebhook.Close)
webhookURL := testWebhook.Server.URL
config.Set("REPORTING_URL", webhookURL)
cfg.Set("REPORTING_URL", webhookURL)

// create runner and start it
runner, err := flusher.CreateRunner(ctx, "tracked_users_reports", logger.NOP, stats.NOP, config.Default, "test")
runner, err := flusher.CreateRunner(ctx, "tracked_users_reports", logger.NOP, stats.NOP, cfg, "test")
require.NoError(t, err)

runnerDone := make(chan struct{})
Expand All @@ -185,10 +189,18 @@ func TestTrackedUsersFlush(t *testing.T) {
// Wait till all the reports which were added tenMinAgo to be flushed
require.Eventually(t, func() bool {
var count int
_ = pgContainer.DB.QueryRow("SELECT COUNT(*) FROM tracked_users_reports").Scan(&count)
_ = pgContainer.DB.QueryRowContext(ctx, "SELECT COUNT(*) FROM tracked_users_reports").Scan(&count)
return count == 1
}, time.Second*30, time.Millisecond*1000)

// number of dead tuples always be 0 as we have set a very low threshold for Reporting.flusher.vacuumThresholdBytes
require.Never(t, func() bool {
var deadTuples int
_ = pgContainer.DB.QueryRowContext(ctx, "select n_dead_tup from pg_stat_all_tables where relname='tracked_users_reports'").Scan(&deadTuples)
fmt.Println(deadTuples)
return deadTuples > 0
}, 60*time.Second, time.Second)

runner.Stop()

// validate we get 2 requests, we have batch size of 2
Expand Down

0 comments on commit 9311b89

Please sign in to comment.