Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: vaccum handling in error detail reports table #4945

Merged
merged 9 commits into from
Aug 5, 2024
Merged
50 changes: 39 additions & 11 deletions enterprise/reporting/error_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/samber/lo"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
Expand Down Expand Up @@ -80,6 +81,8 @@ type ErrorDetailReporter struct {
minReportedAtQueryTime stats.Measurement
errorDetailReportsQueryTime stats.Measurement
edReportingRequestLatency stats.Measurement

stats stats.Stats
}

type errorDetails struct {
Expand All @@ -90,6 +93,7 @@ type errorDetails struct {
func NewErrorDetailReporter(
ctx context.Context,
configSubscriber *configSubscriber,
stats stats.Stats,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👏 for injecting stats

) *ErrorDetailReporter {
tr := &http.Transport{}
reportingServiceURL := config.GetString("REPORTING_URL", "https://reporting.dev.rudderlabs.com")
Expand Down Expand Up @@ -124,6 +128,7 @@ func NewErrorDetailReporter(
syncers: make(map[string]*types.SyncSource),
errorDetailExtractor: extractor,
maxOpenConnections: maxOpenConnections,
stats: stats,
}
}

Expand Down Expand Up @@ -197,7 +202,7 @@ func (edr *ErrorDetailReporter) Report(ctx context.Context, metrics []*types.PUR
// extract error-message & error-code
errDets := edr.extractErrorDetails(metric.StatusDetail.SampleResponse)

stats.Default.NewTaggedStat("error_detail_reporting_failures", stats.CountType, stats.Tags{
edr.stats.NewTaggedStat("error_detail_reporting_failures", stats.CountType, stats.Tags{
"errorCode": errDets.ErrorCode,
"workspaceId": workspaceID,
"destType": destinationDetail.destType,
Expand Down Expand Up @@ -297,17 +302,19 @@ func (edr *ErrorDetailReporter) mainLoop(ctx context.Context, c types.SyncerConf

tags := edr.getTags(c.Label)

mainLoopTimer := stats.Default.NewTaggedStat("error_detail_reports_main_loop_time", stats.TimerType, tags)
getReportsTimer := stats.Default.NewTaggedStat("error_detail_reports_get_reports_time", stats.TimerType, tags)
aggregateTimer := stats.Default.NewTaggedStat("error_detail_reports_aggregate_time", stats.TimerType, tags)
getReportsSize := stats.Default.NewTaggedStat("error_detail_reports_size", stats.HistogramType, tags)
getAggregatedReportsSize := stats.Default.NewTaggedStat("error_detail_reports_aggregated_size", stats.HistogramType, tags)
mainLoopTimer := edr.stats.NewTaggedStat("error_detail_reports_main_loop_time", stats.TimerType, tags)
getReportsTimer := edr.stats.NewTaggedStat("error_detail_reports_get_reports_time", stats.TimerType, tags)
aggregateTimer := edr.stats.NewTaggedStat("error_detail_reports_aggregate_time", stats.TimerType, tags)
getReportsSize := edr.stats.NewTaggedStat("error_detail_reports_size", stats.HistogramType, tags)
getAggregatedReportsSize := edr.stats.NewTaggedStat("error_detail_reports_aggregated_size", stats.HistogramType, tags)

errorDetailReportsDeleteQueryTimer := edr.stats.NewTaggedStat("error_detail_reports_delete_query_time", stats.TimerType, tags)

errorDetailReportsDeleteQueryTimer := stats.Default.NewTaggedStat("error_detail_reports_delete_query_time", stats.TimerType, tags)
edr.minReportedAtQueryTime = edr.stats.NewTaggedStat("error_detail_reports_min_reported_at_query_time", stats.TimerType, tags)
edr.errorDetailReportsQueryTime = edr.stats.NewTaggedStat("error_detail_reports_query_time", stats.TimerType, tags)
edr.edReportingRequestLatency = edr.stats.NewTaggedStat("error_detail_reporting_request_latency", stats.TimerType, tags)

edr.minReportedAtQueryTime = stats.Default.NewTaggedStat("error_detail_reports_min_reported_at_query_time", stats.TimerType, tags)
edr.errorDetailReportsQueryTime = stats.Default.NewTaggedStat("error_detail_reports_query_time", stats.TimerType, tags)
edr.edReportingRequestLatency = stats.Default.NewTaggedStat("error_detail_reporting_request_latency", stats.TimerType, tags)
vacuumDuration := edr.stats.NewTaggedStat(StatReportingVacuumDuration, stats.TimerType, tags)

// In infinite loop
// Get Reports
Expand Down Expand Up @@ -375,6 +382,27 @@ func (edr *ErrorDetailReporter) mainLoop(ctx context.Context, c types.SyncerConf
if err != nil {
edr.log.Errorf("[ Error Detail Reporting ]: Error deleting local reports from %s: %v", ErrorDetailReportsTable, err)
}
vacuumStart := time.Now()
var sizeEstimate int64
if err := dbHandle.QueryRowContext(
ctx,
fmt.Sprintf(`SELECT pg_table_size(oid) from pg_class where relname='%s';`, ErrorDetailReportsTable),
sanpj2292 marked this conversation as resolved.
Show resolved Hide resolved
).Scan(&sizeEstimate); err != nil {
edr.log.Errorn(
`[ Error detail Reporting ]: Error getting table size estimate`,
sanpj2292 marked this conversation as resolved.
Show resolved Hide resolved
logger.NewErrorField(err),
)
}
if sizeEstimate > config.GetInt64("Reporting.errorReporting.vacuumThresholdBytes", 5*bytesize.GB) {
vaccumStatement := fmt.Sprintf("vacuum full analyze %s;", ErrorDetailReportsTable)
sanpj2292 marked this conversation as resolved.
Show resolved Hide resolved
if _, err := dbHandle.ExecContext(ctx, vaccumStatement); err != nil {
edr.log.Errorn(
`[ Error detail Reporting ]: Error vacuuming error_detail_reports table`,
sanpj2292 marked this conversation as resolved.
Show resolved Hide resolved
logger.NewErrorField(err),
)
}
vacuumDuration.Since(vacuumStart)
}
}

mainLoopTimer.Since(loopStart)
Expand Down Expand Up @@ -602,7 +630,7 @@ func (edr *ErrorDetailReporter) sendMetric(ctx context.Context, label string, me
edr.edReportingRequestLatency.Since(httpRequestStart)
httpStatTags := edr.getTags(label)
httpStatTags["status"] = strconv.Itoa(resp.StatusCode)
stats.Default.NewTaggedStat("error_detail_reporting_http_request", stats.CountType, httpStatTags).Increment()
edr.stats.NewTaggedStat("error_detail_reporting_http_request", stats.CountType, httpStatTags).Increment()

defer func() { httputil.CloseResponse(resp) }()
respBody, err := io.ReadAll(resp.Body)
Expand Down
2 changes: 1 addition & 1 deletion enterprise/reporting/mediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToke

// error reporting implementation
if config.GetBool("Reporting.errorReporting.enabled", false) {
errorReporter := NewErrorDetailReporter(rm.ctx, configSubscriber)
errorReporter := NewErrorDetailReporter(rm.ctx, configSubscriber, rm.stats)
rm.reporters = append(rm.reporters, errorReporter)
}

Expand Down
4 changes: 2 additions & 2 deletions enterprise/reporting/reporting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func TestExtractErrorDetails(t *testing.T) {
},
}

edr := NewErrorDetailReporter(context.Background(), &configSubscriber{})
edr := NewErrorDetailReporter(context.Background(), &configSubscriber{}, stats.NOP)
for _, tc := range testCases {
t.Run(tc.caseDescription, func(t *testing.T) {
errorDetails := edr.extractErrorDetails(tc.inputErrMsg)
Expand Down Expand Up @@ -599,7 +599,7 @@ func TestAggregationLogic(t *testing.T) {
},
}
configSubscriber := newConfigSubscriber(logger.NOP)
ed := NewErrorDetailReporter(context.Background(), configSubscriber)
ed := NewErrorDetailReporter(context.Background(), configSubscriber, stats.NOP)
reportingMetrics := ed.aggregate(dbErrs)

reportResults := []*types.EDMetric{
Expand Down
Loading