Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: vaccum handling in error detail reports table #4945

Merged
merged 9 commits into from
Aug 5, 2024
Merged
56 changes: 45 additions & 11 deletions enterprise/reporting/error_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/samber/lo"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
Expand Down Expand Up @@ -80,6 +81,8 @@ type ErrorDetailReporter struct {
minReportedAtQueryTime stats.Measurement
errorDetailReportsQueryTime stats.Measurement
edReportingRequestLatency stats.Measurement

stats stats.Stats
}

type errorDetails struct {
Expand All @@ -90,6 +93,7 @@ type errorDetails struct {
func NewErrorDetailReporter(
ctx context.Context,
configSubscriber *configSubscriber,
stats stats.Stats,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👏 for injecting stats

) *ErrorDetailReporter {
tr := &http.Transport{}
reportingServiceURL := config.GetString("REPORTING_URL", "https://reporting.dev.rudderlabs.com")
Expand Down Expand Up @@ -124,6 +128,7 @@ func NewErrorDetailReporter(
syncers: make(map[string]*types.SyncSource),
errorDetailExtractor: extractor,
maxOpenConnections: maxOpenConnections,
stats: stats,
}
}

Expand Down Expand Up @@ -197,7 +202,7 @@ func (edr *ErrorDetailReporter) Report(ctx context.Context, metrics []*types.PUR
// extract error-message & error-code
errDets := edr.extractErrorDetails(metric.StatusDetail.SampleResponse)

stats.Default.NewTaggedStat("error_detail_reporting_failures", stats.CountType, stats.Tags{
edr.stats.NewTaggedStat("error_detail_reporting_failures", stats.CountType, stats.Tags{
"errorCode": errDets.ErrorCode,
"workspaceId": workspaceID,
"destType": destinationDetail.destType,
Expand Down Expand Up @@ -297,17 +302,17 @@ func (edr *ErrorDetailReporter) mainLoop(ctx context.Context, c types.SyncerConf

tags := edr.getTags(c.Label)

mainLoopTimer := stats.Default.NewTaggedStat("error_detail_reports_main_loop_time", stats.TimerType, tags)
getReportsTimer := stats.Default.NewTaggedStat("error_detail_reports_get_reports_time", stats.TimerType, tags)
aggregateTimer := stats.Default.NewTaggedStat("error_detail_reports_aggregate_time", stats.TimerType, tags)
getReportsSize := stats.Default.NewTaggedStat("error_detail_reports_size", stats.HistogramType, tags)
getAggregatedReportsSize := stats.Default.NewTaggedStat("error_detail_reports_aggregated_size", stats.HistogramType, tags)
mainLoopTimer := edr.stats.NewTaggedStat("error_detail_reports_main_loop_time", stats.TimerType, tags)
getReportsTimer := edr.stats.NewTaggedStat("error_detail_reports_get_reports_time", stats.TimerType, tags)
aggregateTimer := edr.stats.NewTaggedStat("error_detail_reports_aggregate_time", stats.TimerType, tags)
getReportsSize := edr.stats.NewTaggedStat("error_detail_reports_size", stats.HistogramType, tags)
getAggregatedReportsSize := edr.stats.NewTaggedStat("error_detail_reports_aggregated_size", stats.HistogramType, tags)

errorDetailReportsDeleteQueryTimer := stats.Default.NewTaggedStat("error_detail_reports_delete_query_time", stats.TimerType, tags)
errorDetailReportsDeleteQueryTimer := edr.stats.NewTaggedStat("error_detail_reports_delete_query_time", stats.TimerType, tags)

edr.minReportedAtQueryTime = stats.Default.NewTaggedStat("error_detail_reports_min_reported_at_query_time", stats.TimerType, tags)
edr.errorDetailReportsQueryTime = stats.Default.NewTaggedStat("error_detail_reports_query_time", stats.TimerType, tags)
edr.edReportingRequestLatency = stats.Default.NewTaggedStat("error_detail_reporting_request_latency", stats.TimerType, tags)
edr.minReportedAtQueryTime = edr.stats.NewTaggedStat("error_detail_reports_min_reported_at_query_time", stats.TimerType, tags)
edr.errorDetailReportsQueryTime = edr.stats.NewTaggedStat("error_detail_reports_query_time", stats.TimerType, tags)
edr.edReportingRequestLatency = edr.stats.NewTaggedStat("error_detail_reporting_request_latency", stats.TimerType, tags)

// In infinite loop
// Get Reports
Expand Down Expand Up @@ -375,6 +380,8 @@ func (edr *ErrorDetailReporter) mainLoop(ctx context.Context, c types.SyncerConf
if err != nil {
edr.log.Errorf("[ Error Detail Reporting ]: Error deleting local reports from %s: %v", ErrorDetailReportsTable, err)
}
// vacuum error_reports_details table
edr.vacuum(ctx, dbHandle, c)
}

mainLoopTimer.Since(loopStart)
Expand All @@ -386,6 +393,33 @@ func (edr *ErrorDetailReporter) mainLoop(ctx context.Context, c types.SyncerConf
}
}

func (edr *ErrorDetailReporter) vacuum(ctx context.Context, dbHandle *sql.DB, c types.SyncerConfig) {
tags := edr.getTags(c.Label)
vacuumDuration := edr.stats.NewTaggedStat(StatReportingVacuumDuration, stats.TimerType, tags)

vacuumStart := time.Now()
var sizeEstimate int64
if err := dbHandle.QueryRowContext(
ctx,
`SELECT pg_table_size(oid) from pg_class where relname = $1`, ErrorDetailReportsTable,
).Scan(&sizeEstimate); err != nil {
edr.log.Errorn(
fmt.Sprintf(`Error getting %s table size estimate`, ErrorDetailReportsTable),
logger.NewErrorField(err),
)
}
if sizeEstimate > config.GetInt64("Reporting.errorReporting.vacuumThresholdBytes", 5*bytesize.GB) {
vaccumStatement := fmt.Sprintf("vacuum full analyze %s", pq.QuoteIdentifier(ErrorDetailReportsTable))
if _, err := dbHandle.ExecContext(ctx, vaccumStatement); err != nil {
edr.log.Errorn(
fmt.Sprintf(`Error vacuuming %s table`, ErrorDetailReportsTable),
logger.NewErrorField(err),
)
}
sanpj2292 marked this conversation as resolved.
Show resolved Hide resolved
}
vacuumDuration.Since(vacuumStart)
}

func (edr *ErrorDetailReporter) getReports(ctx context.Context, currentMs int64, syncerKey string) ([]*types.EDReportsDB, int64) {
var queryMin sql.NullInt64
dbHandle, err := edr.getDBHandle(syncerKey)
Expand Down Expand Up @@ -602,7 +636,7 @@ func (edr *ErrorDetailReporter) sendMetric(ctx context.Context, label string, me
edr.edReportingRequestLatency.Since(httpRequestStart)
httpStatTags := edr.getTags(label)
httpStatTags["status"] = strconv.Itoa(resp.StatusCode)
stats.Default.NewTaggedStat("error_detail_reporting_http_request", stats.CountType, httpStatTags).Increment()
edr.stats.NewTaggedStat("error_detail_reporting_http_request", stats.CountType, httpStatTags).Increment()

defer func() { httputil.CloseResponse(resp) }()
respBody, err := io.ReadAll(resp.Body)
Expand Down
2 changes: 1 addition & 1 deletion enterprise/reporting/mediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToke

// error reporting implementation
if config.GetBool("Reporting.errorReporting.enabled", false) {
errorReporter := NewErrorDetailReporter(rm.ctx, configSubscriber)
errorReporter := NewErrorDetailReporter(rm.ctx, configSubscriber, rm.stats)
rm.reporters = append(rm.reporters, errorReporter)
}

Expand Down
4 changes: 2 additions & 2 deletions enterprise/reporting/reporting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func TestExtractErrorDetails(t *testing.T) {
},
}

edr := NewErrorDetailReporter(context.Background(), &configSubscriber{})
edr := NewErrorDetailReporter(context.Background(), &configSubscriber{}, stats.NOP)
for _, tc := range testCases {
t.Run(tc.caseDescription, func(t *testing.T) {
errorDetails := edr.extractErrorDetails(tc.inputErrMsg)
Expand Down Expand Up @@ -599,7 +599,7 @@ func TestAggregationLogic(t *testing.T) {
},
}
configSubscriber := newConfigSubscriber(logger.NOP)
ed := NewErrorDetailReporter(context.Background(), configSubscriber)
ed := NewErrorDetailReporter(context.Background(), configSubscriber, stats.NOP)
reportingMetrics := ed.aggregate(dbErrs)

reportResults := []*types.EDMetric{
Expand Down
Loading