Skip to content

Commit

Permalink
fix: vaccum handling in error detail reports table (#4945)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanpj2292 authored Aug 5, 2024
1 parent a27b583 commit 77b75fb
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 24 deletions.
78 changes: 57 additions & 21 deletions enterprise/reporting/error_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/samber/lo"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
Expand Down Expand Up @@ -80,6 +81,9 @@ type ErrorDetailReporter struct {
minReportedAtQueryTime stats.Measurement
errorDetailReportsQueryTime stats.Measurement
edReportingRequestLatency stats.Measurement

stats stats.Stats
config *config.Config
}

type errorDetails struct {
Expand All @@ -90,16 +94,18 @@ type errorDetails struct {
func NewErrorDetailReporter(
ctx context.Context,
configSubscriber *configSubscriber,
stats stats.Stats,
conf *config.Config,
) *ErrorDetailReporter {
tr := &http.Transport{}
reportingServiceURL := config.GetString("REPORTING_URL", "https://reporting.dev.rudderlabs.com")
reportingServiceURL := conf.GetString("REPORTING_URL", "https://reporting.dev.rudderlabs.com")
reportingServiceURL = strings.TrimSuffix(reportingServiceURL, "/")

netClient := &http.Client{Transport: tr, Timeout: config.GetDuration("HttpClient.reporting.timeout", 60, time.Second)}
mainLoopSleepInterval := config.GetReloadableDurationVar(5, time.Second, "Reporting.mainLoopSleepInterval")
sleepInterval := config.GetReloadableDurationVar(30, time.Second, "Reporting.sleepInterval")
maxConcurrentRequests := config.GetReloadableIntVar(32, 1, "Reporting.maxConcurrentRequests")
maxOpenConnections := config.GetIntVar(16, 1, "Reporting.errorReporting.maxOpenConnections")
netClient := &http.Client{Transport: tr, Timeout: conf.GetDuration("HttpClient.reporting.timeout", 60, time.Second)}
mainLoopSleepInterval := conf.GetReloadableDurationVar(5, time.Second, "Reporting.mainLoopSleepInterval")
sleepInterval := conf.GetReloadableDurationVar(30, time.Second, "Reporting.sleepInterval")
maxConcurrentRequests := conf.GetReloadableIntVar(32, 1, "Reporting.maxConcurrentRequests")
maxOpenConnections := conf.GetIntVar(16, 1, "Reporting.errorReporting.maxOpenConnections")

log := logger.NewLogger().Child("enterprise").Child("error-detail-reporting")
extractor := NewErrorDetailExtractor(log)
Expand All @@ -117,13 +123,15 @@ func NewErrorDetailReporter(
httpClient: netClient,

namespace: config.GetKubeNamespace(),
instanceID: config.GetString("INSTANCE_ID", "1"),
region: config.GetString("region", ""),
instanceID: conf.GetString("INSTANCE_ID", "1"),
region: conf.GetString("region", ""),

configSubscriber: configSubscriber,
syncers: make(map[string]*types.SyncSource),
errorDetailExtractor: extractor,
maxOpenConnections: maxOpenConnections,
stats: stats,
config: conf,
}
}

Expand All @@ -143,7 +151,7 @@ func (edr *ErrorDetailReporter) DatabaseSyncer(c types.SyncerConfig) types.Repor
}
edr.syncers[c.ConnInfo] = &types.SyncSource{SyncerConfig: c, DbHandle: dbHandle}

if !config.GetBool("Reporting.errorReporting.syncer.enabled", true) {
if !edr.config.GetBool("Reporting.errorReporting.syncer.enabled", true) {
return func() {}
}

Expand Down Expand Up @@ -197,7 +205,7 @@ func (edr *ErrorDetailReporter) Report(ctx context.Context, metrics []*types.PUR
// extract error-message & error-code
errDets := edr.extractErrorDetails(metric.StatusDetail.SampleResponse)

stats.Default.NewTaggedStat("error_detail_reporting_failures", stats.CountType, stats.Tags{
edr.stats.NewTaggedStat("error_detail_reporting_failures", stats.CountType, stats.Tags{
"errorCode": errDets.ErrorCode,
"workspaceId": workspaceID,
"destType": destinationDetail.destType,
Expand Down Expand Up @@ -256,7 +264,7 @@ func (edr *ErrorDetailReporter) migrate(c types.SyncerConfig) (*sql.DB, error) {
Handle: dbHandle,
MigrationsTable: fmt.Sprintf("%v_migrations", ErrorDetailReportsTable),
// TODO: shall we use separate env ?
ShouldForceSetLowerVersion: config.GetBool("SQLMigrator.forceSetLowerVersion", true),
ShouldForceSetLowerVersion: edr.config.GetBool("SQLMigrator.forceSetLowerVersion", true),
}
err = m.Migrate(ErrorDetailReportsTable)
if err != nil {
Expand Down Expand Up @@ -297,17 +305,17 @@ func (edr *ErrorDetailReporter) mainLoop(ctx context.Context, c types.SyncerConf

tags := edr.getTags(c.Label)

mainLoopTimer := stats.Default.NewTaggedStat("error_detail_reports_main_loop_time", stats.TimerType, tags)
getReportsTimer := stats.Default.NewTaggedStat("error_detail_reports_get_reports_time", stats.TimerType, tags)
aggregateTimer := stats.Default.NewTaggedStat("error_detail_reports_aggregate_time", stats.TimerType, tags)
getReportsSize := stats.Default.NewTaggedStat("error_detail_reports_size", stats.HistogramType, tags)
getAggregatedReportsSize := stats.Default.NewTaggedStat("error_detail_reports_aggregated_size", stats.HistogramType, tags)
mainLoopTimer := edr.stats.NewTaggedStat("error_detail_reports_main_loop_time", stats.TimerType, tags)
getReportsTimer := edr.stats.NewTaggedStat("error_detail_reports_get_reports_time", stats.TimerType, tags)
aggregateTimer := edr.stats.NewTaggedStat("error_detail_reports_aggregate_time", stats.TimerType, tags)
getReportsSize := edr.stats.NewTaggedStat("error_detail_reports_size", stats.HistogramType, tags)
getAggregatedReportsSize := edr.stats.NewTaggedStat("error_detail_reports_aggregated_size", stats.HistogramType, tags)

errorDetailReportsDeleteQueryTimer := stats.Default.NewTaggedStat("error_detail_reports_delete_query_time", stats.TimerType, tags)
errorDetailReportsDeleteQueryTimer := edr.stats.NewTaggedStat("error_detail_reports_delete_query_time", stats.TimerType, tags)

edr.minReportedAtQueryTime = stats.Default.NewTaggedStat("error_detail_reports_min_reported_at_query_time", stats.TimerType, tags)
edr.errorDetailReportsQueryTime = stats.Default.NewTaggedStat("error_detail_reports_query_time", stats.TimerType, tags)
edr.edReportingRequestLatency = stats.Default.NewTaggedStat("error_detail_reporting_request_latency", stats.TimerType, tags)
edr.minReportedAtQueryTime = edr.stats.NewTaggedStat("error_detail_reports_min_reported_at_query_time", stats.TimerType, tags)
edr.errorDetailReportsQueryTime = edr.stats.NewTaggedStat("error_detail_reports_query_time", stats.TimerType, tags)
edr.edReportingRequestLatency = edr.stats.NewTaggedStat("error_detail_reporting_request_latency", stats.TimerType, tags)

// In infinite loop
// Get Reports
Expand Down Expand Up @@ -375,6 +383,8 @@ func (edr *ErrorDetailReporter) mainLoop(ctx context.Context, c types.SyncerConf
if err != nil {
edr.log.Errorf("[ Error Detail Reporting ]: Error deleting local reports from %s: %v", ErrorDetailReportsTable, err)
}
// vacuum error_reports_details table
edr.vacuum(ctx, dbHandle, c)
}

mainLoopTimer.Since(loopStart)
Expand All @@ -386,6 +396,32 @@ func (edr *ErrorDetailReporter) mainLoop(ctx context.Context, c types.SyncerConf
}
}

func (edr *ErrorDetailReporter) vacuum(ctx context.Context, dbHandle *sql.DB, c types.SyncerConfig) {
tags := edr.getTags(c.Label)
var sizeEstimate int64
if err := dbHandle.QueryRowContext(
ctx,
`SELECT pg_table_size(oid) from pg_class where relname = $1`, ErrorDetailReportsTable,
).Scan(&sizeEstimate); err != nil {
edr.log.Errorn(
fmt.Sprintf(`Error getting %s table size estimate`, ErrorDetailReportsTable),
logger.NewErrorField(err),
)
}
if sizeEstimate > edr.config.GetInt64("Reporting.errorReporting.vacuumThresholdBytes", 5*bytesize.GB) {
vacuumStart := time.Now()
vacuumDuration := edr.stats.NewTaggedStat(StatReportingVacuumDuration, stats.TimerType, tags)
vaccumStatement := fmt.Sprintf("vacuum full analyze %s", pq.QuoteIdentifier(ErrorDetailReportsTable))
if _, err := dbHandle.ExecContext(ctx, vaccumStatement); err != nil {
edr.log.Errorn(
fmt.Sprintf(`Error vacuuming %s table`, ErrorDetailReportsTable),
logger.NewErrorField(err),
)
}
vacuumDuration.Since(vacuumStart)
}
}

func (edr *ErrorDetailReporter) getReports(ctx context.Context, currentMs int64, syncerKey string) ([]*types.EDReportsDB, int64) {
var queryMin sql.NullInt64
dbHandle, err := edr.getDBHandle(syncerKey)
Expand Down Expand Up @@ -602,7 +638,7 @@ func (edr *ErrorDetailReporter) sendMetric(ctx context.Context, label string, me
edr.edReportingRequestLatency.Since(httpRequestStart)
httpStatTags := edr.getTags(label)
httpStatTags["status"] = strconv.Itoa(resp.StatusCode)
stats.Default.NewTaggedStat("error_detail_reporting_http_request", stats.CountType, httpStatTags).Increment()
edr.stats.NewTaggedStat("error_detail_reporting_http_request", stats.CountType, httpStatTags).Increment()

defer func() { httputil.CloseResponse(resp) }()
respBody, err := io.ReadAll(resp.Body)
Expand Down
2 changes: 1 addition & 1 deletion enterprise/reporting/mediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToke

// error reporting implementation
if config.GetBool("Reporting.errorReporting.enabled", false) {
errorReporter := NewErrorDetailReporter(rm.ctx, configSubscriber)
errorReporter := NewErrorDetailReporter(rm.ctx, configSubscriber, rm.stats, config.Default)
rm.reporters = append(rm.reporters, errorReporter)
}

Expand Down
5 changes: 3 additions & 2 deletions enterprise/reporting/reporting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"testing"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/stats"

. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -415,7 +416,7 @@ func TestExtractErrorDetails(t *testing.T) {
},
}

edr := NewErrorDetailReporter(context.Background(), &configSubscriber{})
edr := NewErrorDetailReporter(context.Background(), &configSubscriber{}, stats.NOP, config.Default)
for _, tc := range testCases {
t.Run(tc.caseDescription, func(t *testing.T) {
errorDetails := edr.extractErrorDetails(tc.inputErrMsg)
Expand Down Expand Up @@ -599,7 +600,7 @@ func TestAggregationLogic(t *testing.T) {
},
}
configSubscriber := newConfigSubscriber(logger.NOP)
ed := NewErrorDetailReporter(context.Background(), configSubscriber)
ed := NewErrorDetailReporter(context.Background(), configSubscriber, stats.NOP, config.Default)
reportingMetrics := ed.aggregate(dbErrs)

reportResults := []*types.EDMetric{
Expand Down

0 comments on commit 77b75fb

Please sign in to comment.