diff --git a/enterprise/reporting/error_reporting.go b/enterprise/reporting/error_reporting.go index a537e23403..d713b24665 100644 --- a/enterprise/reporting/error_reporting.go +++ b/enterprise/reporting/error_reporting.go @@ -19,6 +19,7 @@ import ( "github.com/samber/lo" "golang.org/x/sync/errgroup" + "github.com/rudderlabs/rudder-go-kit/bytesize" "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" @@ -80,6 +81,9 @@ type ErrorDetailReporter struct { minReportedAtQueryTime stats.Measurement errorDetailReportsQueryTime stats.Measurement edReportingRequestLatency stats.Measurement + + stats stats.Stats + config *config.Config } type errorDetails struct { @@ -90,16 +94,18 @@ type errorDetails struct { func NewErrorDetailReporter( ctx context.Context, configSubscriber *configSubscriber, + stats stats.Stats, + conf *config.Config, ) *ErrorDetailReporter { tr := &http.Transport{} - reportingServiceURL := config.GetString("REPORTING_URL", "https://reporting.dev.rudderlabs.com") + reportingServiceURL := conf.GetString("REPORTING_URL", "https://reporting.dev.rudderlabs.com") reportingServiceURL = strings.TrimSuffix(reportingServiceURL, "/") - netClient := &http.Client{Transport: tr, Timeout: config.GetDuration("HttpClient.reporting.timeout", 60, time.Second)} - mainLoopSleepInterval := config.GetReloadableDurationVar(5, time.Second, "Reporting.mainLoopSleepInterval") - sleepInterval := config.GetReloadableDurationVar(30, time.Second, "Reporting.sleepInterval") - maxConcurrentRequests := config.GetReloadableIntVar(32, 1, "Reporting.maxConcurrentRequests") - maxOpenConnections := config.GetIntVar(16, 1, "Reporting.errorReporting.maxOpenConnections") + netClient := &http.Client{Transport: tr, Timeout: conf.GetDuration("HttpClient.reporting.timeout", 60, time.Second)} + mainLoopSleepInterval := conf.GetReloadableDurationVar(5, time.Second, "Reporting.mainLoopSleepInterval") + sleepInterval := conf.GetReloadableDurationVar(30, time.Second, "Reporting.sleepInterval") + maxConcurrentRequests := conf.GetReloadableIntVar(32, 1, "Reporting.maxConcurrentRequests") + maxOpenConnections := conf.GetIntVar(16, 1, "Reporting.errorReporting.maxOpenConnections") log := logger.NewLogger().Child("enterprise").Child("error-detail-reporting") extractor := NewErrorDetailExtractor(log) @@ -117,13 +123,15 @@ func NewErrorDetailReporter( httpClient: netClient, namespace: config.GetKubeNamespace(), - instanceID: config.GetString("INSTANCE_ID", "1"), - region: config.GetString("region", ""), + instanceID: conf.GetString("INSTANCE_ID", "1"), + region: conf.GetString("region", ""), configSubscriber: configSubscriber, syncers: make(map[string]*types.SyncSource), errorDetailExtractor: extractor, maxOpenConnections: maxOpenConnections, + stats: stats, + config: conf, } } @@ -143,7 +151,7 @@ func (edr *ErrorDetailReporter) DatabaseSyncer(c types.SyncerConfig) types.Repor } edr.syncers[c.ConnInfo] = &types.SyncSource{SyncerConfig: c, DbHandle: dbHandle} - if !config.GetBool("Reporting.errorReporting.syncer.enabled", true) { + if !edr.config.GetBool("Reporting.errorReporting.syncer.enabled", true) { return func() {} } @@ -197,7 +205,7 @@ func (edr *ErrorDetailReporter) Report(ctx context.Context, metrics []*types.PUR // extract error-message & error-code errDets := edr.extractErrorDetails(metric.StatusDetail.SampleResponse) - stats.Default.NewTaggedStat("error_detail_reporting_failures", stats.CountType, stats.Tags{ + edr.stats.NewTaggedStat("error_detail_reporting_failures", stats.CountType, stats.Tags{ "errorCode": errDets.ErrorCode, "workspaceId": workspaceID, "destType": destinationDetail.destType, @@ -256,7 +264,7 @@ func (edr *ErrorDetailReporter) migrate(c types.SyncerConfig) (*sql.DB, error) { Handle: dbHandle, MigrationsTable: fmt.Sprintf("%v_migrations", ErrorDetailReportsTable), // TODO: shall we use separate env ? - ShouldForceSetLowerVersion: config.GetBool("SQLMigrator.forceSetLowerVersion", true), + ShouldForceSetLowerVersion: edr.config.GetBool("SQLMigrator.forceSetLowerVersion", true), } err = m.Migrate(ErrorDetailReportsTable) if err != nil { @@ -297,17 +305,17 @@ func (edr *ErrorDetailReporter) mainLoop(ctx context.Context, c types.SyncerConf tags := edr.getTags(c.Label) - mainLoopTimer := stats.Default.NewTaggedStat("error_detail_reports_main_loop_time", stats.TimerType, tags) - getReportsTimer := stats.Default.NewTaggedStat("error_detail_reports_get_reports_time", stats.TimerType, tags) - aggregateTimer := stats.Default.NewTaggedStat("error_detail_reports_aggregate_time", stats.TimerType, tags) - getReportsSize := stats.Default.NewTaggedStat("error_detail_reports_size", stats.HistogramType, tags) - getAggregatedReportsSize := stats.Default.NewTaggedStat("error_detail_reports_aggregated_size", stats.HistogramType, tags) + mainLoopTimer := edr.stats.NewTaggedStat("error_detail_reports_main_loop_time", stats.TimerType, tags) + getReportsTimer := edr.stats.NewTaggedStat("error_detail_reports_get_reports_time", stats.TimerType, tags) + aggregateTimer := edr.stats.NewTaggedStat("error_detail_reports_aggregate_time", stats.TimerType, tags) + getReportsSize := edr.stats.NewTaggedStat("error_detail_reports_size", stats.HistogramType, tags) + getAggregatedReportsSize := edr.stats.NewTaggedStat("error_detail_reports_aggregated_size", stats.HistogramType, tags) - errorDetailReportsDeleteQueryTimer := stats.Default.NewTaggedStat("error_detail_reports_delete_query_time", stats.TimerType, tags) + errorDetailReportsDeleteQueryTimer := edr.stats.NewTaggedStat("error_detail_reports_delete_query_time", stats.TimerType, tags) - edr.minReportedAtQueryTime = stats.Default.NewTaggedStat("error_detail_reports_min_reported_at_query_time", stats.TimerType, tags) - edr.errorDetailReportsQueryTime = stats.Default.NewTaggedStat("error_detail_reports_query_time", stats.TimerType, tags) - edr.edReportingRequestLatency = stats.Default.NewTaggedStat("error_detail_reporting_request_latency", stats.TimerType, tags) + edr.minReportedAtQueryTime = edr.stats.NewTaggedStat("error_detail_reports_min_reported_at_query_time", stats.TimerType, tags) + edr.errorDetailReportsQueryTime = edr.stats.NewTaggedStat("error_detail_reports_query_time", stats.TimerType, tags) + edr.edReportingRequestLatency = edr.stats.NewTaggedStat("error_detail_reporting_request_latency", stats.TimerType, tags) // In infinite loop // Get Reports @@ -375,6 +383,8 @@ func (edr *ErrorDetailReporter) mainLoop(ctx context.Context, c types.SyncerConf if err != nil { edr.log.Errorf("[ Error Detail Reporting ]: Error deleting local reports from %s: %v", ErrorDetailReportsTable, err) } + // vacuum error_reports_details table + edr.vacuum(ctx, dbHandle, c) } mainLoopTimer.Since(loopStart) @@ -386,6 +396,32 @@ func (edr *ErrorDetailReporter) mainLoop(ctx context.Context, c types.SyncerConf } } +func (edr *ErrorDetailReporter) vacuum(ctx context.Context, dbHandle *sql.DB, c types.SyncerConfig) { + tags := edr.getTags(c.Label) + var sizeEstimate int64 + if err := dbHandle.QueryRowContext( + ctx, + `SELECT pg_table_size(oid) from pg_class where relname = $1`, ErrorDetailReportsTable, + ).Scan(&sizeEstimate); err != nil { + edr.log.Errorn( + fmt.Sprintf(`Error getting %s table size estimate`, ErrorDetailReportsTable), + logger.NewErrorField(err), + ) + } + if sizeEstimate > edr.config.GetInt64("Reporting.errorReporting.vacuumThresholdBytes", 5*bytesize.GB) { + vacuumStart := time.Now() + vacuumDuration := edr.stats.NewTaggedStat(StatReportingVacuumDuration, stats.TimerType, tags) + vaccumStatement := fmt.Sprintf("vacuum full analyze %s", pq.QuoteIdentifier(ErrorDetailReportsTable)) + if _, err := dbHandle.ExecContext(ctx, vaccumStatement); err != nil { + edr.log.Errorn( + fmt.Sprintf(`Error vacuuming %s table`, ErrorDetailReportsTable), + logger.NewErrorField(err), + ) + } + vacuumDuration.Since(vacuumStart) + } +} + func (edr *ErrorDetailReporter) getReports(ctx context.Context, currentMs int64, syncerKey string) ([]*types.EDReportsDB, int64) { var queryMin sql.NullInt64 dbHandle, err := edr.getDBHandle(syncerKey) @@ -602,7 +638,7 @@ func (edr *ErrorDetailReporter) sendMetric(ctx context.Context, label string, me edr.edReportingRequestLatency.Since(httpRequestStart) httpStatTags := edr.getTags(label) httpStatTags["status"] = strconv.Itoa(resp.StatusCode) - stats.Default.NewTaggedStat("error_detail_reporting_http_request", stats.CountType, httpStatTags).Increment() + edr.stats.NewTaggedStat("error_detail_reporting_http_request", stats.CountType, httpStatTags).Increment() defer func() { httputil.CloseResponse(resp) }() respBody, err := io.ReadAll(resp.Body) diff --git a/enterprise/reporting/mediator.go b/enterprise/reporting/mediator.go index 6738a331e8..8146dcb185 100644 --- a/enterprise/reporting/mediator.go +++ b/enterprise/reporting/mediator.go @@ -63,7 +63,7 @@ func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToke // error reporting implementation if config.GetBool("Reporting.errorReporting.enabled", false) { - errorReporter := NewErrorDetailReporter(rm.ctx, configSubscriber) + errorReporter := NewErrorDetailReporter(rm.ctx, configSubscriber, rm.stats, config.Default) rm.reporters = append(rm.reporters, errorReporter) } diff --git a/enterprise/reporting/reporting_test.go b/enterprise/reporting/reporting_test.go index 75d498c6f9..a5c6369741 100644 --- a/enterprise/reporting/reporting_test.go +++ b/enterprise/reporting/reporting_test.go @@ -5,6 +5,7 @@ import ( "fmt" "testing" + "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/stats" . "github.com/onsi/ginkgo/v2" @@ -415,7 +416,7 @@ func TestExtractErrorDetails(t *testing.T) { }, } - edr := NewErrorDetailReporter(context.Background(), &configSubscriber{}) + edr := NewErrorDetailReporter(context.Background(), &configSubscriber{}, stats.NOP, config.Default) for _, tc := range testCases { t.Run(tc.caseDescription, func(t *testing.T) { errorDetails := edr.extractErrorDetails(tc.inputErrMsg) @@ -599,7 +600,7 @@ func TestAggregationLogic(t *testing.T) { }, } configSubscriber := newConfigSubscriber(logger.NOP) - ed := NewErrorDetailReporter(context.Background(), configSubscriber) + ed := NewErrorDetailReporter(context.Background(), configSubscriber, stats.NOP, config.Default) reportingMetrics := ed.aggregate(dbErrs) reportResults := []*types.EDMetric{