Skip to content

Commit

Permalink
fix: server panics during shutdown with reporting metrics: failed to …
Browse files Browse the repository at this point in the history
…store jobs: context canceled (#4228)
  • Loading branch information
atzoum authored Dec 12, 2023
1 parent 5b94dba commit c5cb5a8
Show file tree
Hide file tree
Showing 16 changed files with 44 additions and 36 deletions.
6 changes: 3 additions & 3 deletions enterprise/reporting/error_index/error_index_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func NewErrorIndexReporter(ctx context.Context, log logger.Logger, configSubscri
}

// Report reports the metrics to the errorIndex JobsDB
func (eir *ErrorIndexReporter) Report(metrics []*types.PUReportedMetric, tx *Tx) error {
func (eir *ErrorIndexReporter) Report(ctx context.Context, metrics []*types.PUReportedMetric, tx *Tx) error {
failedAt := eir.now()

var jobs []*jobsdb.JobT
Expand Down Expand Up @@ -171,8 +171,8 @@ func (eir *ErrorIndexReporter) Report(metrics []*types.PUReportedMetric, tx *Tx)
if err != nil {
return fmt.Errorf("failed to resolve jobsdb: %w", err)
}
if err := db.WithStoreSafeTxFromTx(eir.ctx, tx, func(tx jobsdb.StoreSafeTx) error {
return db.StoreInTx(eir.ctx, tx, jobs)
if err := db.WithStoreSafeTxFromTx(ctx, tx, func(tx jobsdb.StoreSafeTx) error {
return db.StoreInTx(ctx, tx, jobs)
}); err != nil {
return fmt.Errorf("failed to store jobs: %w", err)
}
Expand Down
12 changes: 6 additions & 6 deletions enterprise/reporting/error_index/error_index_reporting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func TestErrorIndexReporter(t *testing.T) {
sqlTx, err := postgresContainer.DB.Begin()
require.NoError(t, err)
tx := &Tx{Tx: sqlTx}
err = eir.Report(tc.reports, tx)
err = eir.Report(context.Background(), tc.reports, tx)
require.NoError(t, err)
require.NoError(t, tx.Commit())
db, err := eir.resolveJobsDB(tx)
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestErrorIndexReporter(t *testing.T) {
sqlTx, err := postgresContainer.DB.Begin()
require.NoError(t, err)
tx := &Tx{Tx: sqlTx}
err = eir.Report([]*types.PUReportedMetric{}, tx)
err = eir.Report(context.Background(), []*types.PUReportedMetric{}, tx)
require.NoError(t, err)
require.NoError(t, tx.Commit())

Expand Down Expand Up @@ -342,7 +342,7 @@ func TestErrorIndexReporter(t *testing.T) {
sqlTx, err := pg2.DB.Begin()
require.NoError(t, err)
tx := &Tx{Tx: sqlTx}
err = eir.Report([]*types.PUReportedMetric{
err = eir.Report(context.Background(), []*types.PUReportedMetric{
{
ConnectionDetails: types.ConnectionDetails{
SourceID: sourceID,
Expand Down Expand Up @@ -409,7 +409,7 @@ func TestErrorIndexReporter(t *testing.T) {
sqlTx, err := pg1.DB.Begin()
require.NoError(t, err)
tx := &Tx{Tx: sqlTx}
err = eir.Report([]*types.PUReportedMetric{
err = eir.Report(context.Background(), []*types.PUReportedMetric{
{
ConnectionDetails: types.ConnectionDetails{
SourceID: sourceID,
Expand Down Expand Up @@ -443,7 +443,7 @@ func TestErrorIndexReporter(t *testing.T) {
sqlTx, err := pg3.DB.Begin()
require.NoError(t, err)
tx := &Tx{Tx: sqlTx}
err = eir.Report([]*types.PUReportedMetric{
err = eir.Report(context.Background(), []*types.PUReportedMetric{
{
ConnectionDetails: types.ConnectionDetails{
SourceID: sourceID,
Expand Down Expand Up @@ -571,7 +571,7 @@ func TestErrorIndexReporter(t *testing.T) {
require.NoError(t, err)

tx := &Tx{Tx: sqlTx}
err = eir.Report(reports, tx)
err = eir.Report(context.Background(), reports, tx)
require.NoError(t, err)
require.NoError(t, tx.Commit())

Expand Down
8 changes: 4 additions & 4 deletions enterprise/reporting/error_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,13 @@ func (edr *ErrorDetailReporter) GetSyncer(syncerKey string) *types.SyncSource {
return edr.syncers[syncerKey]
}

func (edr *ErrorDetailReporter) Report(metrics []*types.PUReportedMetric, txn *Tx) error {
func (edr *ErrorDetailReporter) Report(ctx context.Context, metrics []*types.PUReportedMetric, txn *Tx) error {
edr.log.Debug("[ErrorDetailReport] Report method called\n")
if len(metrics) == 0 {
return nil
}

stmt, err := txn.Prepare(pq.CopyIn(ErrorDetailReportsTable, ErrorDetailReportsColumns...))
stmt, err := txn.PrepareContext(ctx, pq.CopyIn(ErrorDetailReportsTable, ErrorDetailReportsColumns...))
if err != nil {
edr.log.Errorf("Failed during statement preparation: %v", err)
return fmt.Errorf("preparing statement: %v", err)
Expand All @@ -180,7 +180,7 @@ func (edr *ErrorDetailReporter) Report(metrics []*types.PUReportedMetric, txn *T

// extract error-message & error-code
errDets := edr.extractErrorDetails(metric.StatusDetail.SampleResponse)
_, err = stmt.Exec(
_, err = stmt.ExecContext(ctx,
workspaceID,
edr.namespace,
edr.instanceID,
Expand All @@ -203,7 +203,7 @@ func (edr *ErrorDetailReporter) Report(metrics []*types.PUReportedMetric, txn *T
}
}

_, err = stmt.Exec()
_, err = stmt.ExecContext(ctx)
if err != nil {
edr.log.Errorf("Failed during statement preparation: %v", err)
return fmt.Errorf("executing final statement: %v", err)
Expand Down
3 changes: 2 additions & 1 deletion enterprise/reporting/event_stats.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package reporting

import (
"context"
"strconv"

"github.com/rudderlabs/rudder-go-kit/stats"
Expand Down Expand Up @@ -39,7 +40,7 @@ func (es *EventStatsReporter) Record(metrics []*types.PUReportedMetric) {
}
}

func (es *EventStatsReporter) Report(metrics []*types.PUReportedMetric, tx *Tx) error {
func (es *EventStatsReporter) Report(_ context.Context, metrics []*types.PUReportedMetric, tx *Tx) error {
tx.AddSuccessListener(func() {
es.Record(metrics)
})
Expand Down
4 changes: 2 additions & 2 deletions enterprise/reporting/mediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToke
return rm
}

func (rm *Mediator) Report(metrics []*types.PUReportedMetric, txn *Tx) error {
func (rm *Mediator) Report(ctx context.Context, metrics []*types.PUReportedMetric, txn *Tx) error {
for _, reporter := range rm.reporters {
if err := reporter.Report(metrics, txn); err != nil {
if err := reporter.Report(ctx, metrics, txn); err != nil {
return err
}
}
Expand Down
4 changes: 3 additions & 1 deletion enterprise/reporting/noop.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package reporting

import (
"context"

. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
)

// NOOP reporting implementation that does nothing
type NOOP struct{}

func (*NOOP) Report(_ []*types.PUReportedMetric, _ *Tx) error {
func (*NOOP) Report(_ context.Context, _ []*types.PUReportedMetric, _ *Tx) error {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,12 +542,12 @@ func transformMetricForPII(metric types.PUReportedMetric, piiColumns []string) t
return metric
}

func (r *DefaultReporter) Report(metrics []*types.PUReportedMetric, txn *Tx) error {
func (r *DefaultReporter) Report(ctx context.Context, metrics []*types.PUReportedMetric, txn *Tx) error {
if len(metrics) == 0 {
return nil
}

stmt, err := txn.Prepare(pq.CopyIn(ReportsTable,
stmt, err := txn.PrepareContext(ctx, pq.CopyIn(ReportsTable,
"workspace_id", "namespace", "instance_id",
"source_definition_id",
"source_category",
Expand Down Expand Up @@ -617,7 +617,7 @@ func (r *DefaultReporter) Report(metrics []*types.PUReportedMetric, txn *Tx) err
return fmt.Errorf("executing statement: %v", err)
}
}
if _, err = stmt.Exec(); err != nil {
if _, err = stmt.ExecContext(ctx); err != nil {
return fmt.Errorf("executing final statement: %v", err)
}

Expand Down
9 changes: 5 additions & 4 deletions mocks/utils/types/mock_types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2222,13 +2222,13 @@ func (proc *Handle) Store(partition string, in *storeMessage) {
if err != nil {
return fmt.Errorf("publishing rsources stats: %w", err)
}
err = proc.saveDroppedJobs(in.droppedJobs, tx.Tx())
err = proc.saveDroppedJobs(ctx, in.droppedJobs, tx.Tx())
if err != nil {
return fmt.Errorf("saving dropped jobs: %w", err)
}

if proc.isReportingEnabled() {
if err = proc.reporting.Report(in.reportMetrics, tx.Tx()); err != nil {
if err = proc.reporting.Report(ctx, in.reportMetrics, tx.Tx()); err != nil {
return fmt.Errorf("reporting metrics: %w", err)
}
}
Expand Down Expand Up @@ -2684,7 +2684,7 @@ func (proc *Handle) transformSrcDest(
}
}

func (proc *Handle) saveDroppedJobs(droppedJobs []*jobsdb.JobT, tx *Tx) error {
func (proc *Handle) saveDroppedJobs(ctx context.Context, droppedJobs []*jobsdb.JobT, tx *Tx) error {
if len(droppedJobs) > 0 {
for i := range droppedJobs { // each dropped job should have a unique jobID in the scope of the batch
droppedJobs[i].JobID = int64(i)
Expand All @@ -2694,7 +2694,7 @@ func (proc *Handle) saveDroppedJobs(droppedJobs []*jobsdb.JobT, tx *Tx) error {
rsources.IgnoreDestinationID(),
)
rsourcesStats.JobsDropped(droppedJobs)
return rsourcesStats.Publish(context.TODO(), tx.Tx)
return rsourcesStats.Publish(ctx, tx.Tx)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion router/batchrouter/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ func (brt *Handle) updateJobStatus(batchJobs *BatchedJobs, isWarehouse bool, err
}

if brt.reporting != nil && brt.reportingEnabled {
if err = brt.reporting.Report(reportMetrics, tx.Tx()); err != nil {
if err = brt.reporting.Report(ctx, reportMetrics, tx.Tx()); err != nil {
return fmt.Errorf("reporting metrics: %w", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions router/batchrouter/handle_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (brt *Handle) updateJobStatuses(ctx context.Context, destinationID string,
}

if brt.reporting != nil && brt.reportingEnabled {
if err = brt.reporting.Report(reportMetrics, tx.Tx()); err != nil {
if err = brt.reporting.Report(ctx, reportMetrics, tx.Tx()); err != nil {
return fmt.Errorf("reporting metrics: %w", err)
}
}
Expand Down Expand Up @@ -688,7 +688,7 @@ func (brt *Handle) setMultipleJobStatus(asyncOutput common.AsyncUploadOutput, at
}

if brt.reporting != nil && brt.reportingEnabled {
if err = brt.reporting.Report(reportMetrics, tx.Tx()); err != nil {
if err = brt.reporting.Report(ctx, reportMetrics, tx.Tx()); err != nil {
return fmt.Errorf("reporting metrics: %w", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion router/batchrouter/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (w *worker) processJobAsync(jobsWg *sync.WaitGroup, destinationJobs *Destin
return fmt.Errorf("marking %s job statuses as aborted: %w", brt.destType, err)
}
if brt.reporting != nil && brt.reportingEnabled {
if err = brt.reporting.Report(reportMetrics, tx.Tx()); err != nil {
if err = brt.reporting.Report(ctx, reportMetrics, tx.Tx()); err != nil {
return fmt.Errorf("reporting metrics: %w", err)
}
}
Expand Down
4 changes: 3 additions & 1 deletion router/factory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package router

import (
"context"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
Expand Down Expand Up @@ -50,5 +52,5 @@ func (f *Factory) New(destination *backendconfig.DestinationT) *Handle {
}

type reporter interface {
Report(metrics []*utilTypes.PUReportedMetric, txn *Tx) error
Report(ctx context.Context, metrics []*utilTypes.PUReportedMetric, txn *Tx) error
}
2 changes: 1 addition & 1 deletion router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) {
if err != nil {
return err
}
if err = rt.Reporting.Report(reportMetrics, tx.Tx()); err != nil {
if err = rt.Reporting.Report(ctx, reportMetrics, tx.Tx()); err != nil {
return fmt.Errorf("reporting metrics: %w", err)
}
return nil
Expand Down
3 changes: 2 additions & 1 deletion utils/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package types

import (
"context"
"net/http"
"time"

Expand Down Expand Up @@ -62,7 +63,7 @@ type ConfigEnvI interface {
// Reporting is interface to report metrics
type Reporting interface {
// Report reports metrics to reporting service
Report(metrics []*PUReportedMetric, tx *Tx) error
Report(ctx context.Context, metrics []*PUReportedMetric, tx *Tx) error

// DatabaseSyncer creates reporting tables in the database and returns a function to periodically sync the data
DatabaseSyncer(c SyncerConfig) ReportingSyncer
Expand Down
3 changes: 2 additions & 1 deletion warehouse/router/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ func (job *UploadJob) setUploadStatus(statusOpts UploadStatusOpts) (err error) {
}
if job.config.reportingEnabled {
err = job.reporting.Report(
job.ctx,
[]*types.PUReportedMetric{&statusOpts.ReportingMetric},
tx.Tx,
)
Expand Down Expand Up @@ -717,7 +718,7 @@ func (job *UploadJob) setUploadError(statusError error, state string) (string, e
})
}
if job.config.reportingEnabled {
if err = job.reporting.Report(reportingMetrics, txn.Tx); err != nil {
if err = job.reporting.Report(job.ctx, reportingMetrics, txn.Tx); err != nil {
return "", fmt.Errorf("reporting metrics: %w", err)
}
}
Expand Down

0 comments on commit c5cb5a8

Please sign in to comment.