Skip to content

Commit

Permalink
chore: respect transaction in error index reporter
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Oct 19, 2023
1 parent 4db35e9 commit b90288b
Show file tree
Hide file tree
Showing 27 changed files with 107 additions and 65 deletions.
8 changes: 5 additions & 3 deletions enterprise/reporting/error_index_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package reporting

import (
"context"
"database/sql"
"encoding/json"
"fmt"
"time"
Expand All @@ -13,6 +12,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/utils/misc"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
)

Expand Down Expand Up @@ -79,7 +79,7 @@ func NewErrorIndexReporter(
}

// Report reports the metrics to the errorIndex JobsDB
func (eir *ErrorIndexReporter) Report(metrics []*types.PUReportedMetric, _ *sql.Tx) error {
func (eir *ErrorIndexReporter) Report(metrics []*types.PUReportedMetric, tx *Tx) error {
failedAt := eir.now()

var jobs []*jobsdb.JobT
Expand Down Expand Up @@ -134,7 +134,9 @@ func (eir *ErrorIndexReporter) Report(metrics []*types.PUReportedMetric, _ *sql.
return nil
}

if err := eir.errIndexDB.Store(eir.ctx, jobs); err != nil {
if err := eir.errIndexDB.WithStoreSafeTxFromTx(eir.ctx, tx, func(tx jobsdb.StoreSafeTx) error {
return eir.errIndexDB.StoreInTx(eir.ctx, tx, jobs)
}); err != nil {
return fmt.Errorf("failed to store jobs: %v", err)
}

Expand Down
16 changes: 11 additions & 5 deletions enterprise/reporting/error_index_reporting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/rudderlabs/rudder-server/jobsdb"
mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/backend-config"
"github.com/rudderlabs/rudder-server/utils/pubsub"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
)

Expand Down Expand Up @@ -263,10 +264,12 @@ func TestErrorIndexReporter(t *testing.T) {
defer func() {
eir.errIndexDB.TearDown()
}()

err = eir.Report(tc.reports, nil)
sqltx, err := postgresContainer.DB.Begin()
require.NoError(t, err)

tx := &Tx{Tx: sqltx}
err = eir.Report(tc.reports, tx)
require.NoError(t, err)
require.NoError(t, tx.Commit())
jr, err := eir.errIndexDB.GetUnprocessed(ctx, jobsdb.GetQueryParams{
JobsLimit: 100,
})
Expand Down Expand Up @@ -328,9 +331,12 @@ func TestErrorIndexReporter(t *testing.T) {

eir := NewErrorIndexReporter(ctx, c, logger.NOP, cs)
defer eir.errIndexDB.TearDown()

err = eir.Report([]*types.PUReportedMetric{}, nil)
sqltx, err := postgresContainer.DB.Begin()
require.NoError(t, err)
tx := &Tx{Tx: sqltx}
err = eir.Report([]*types.PUReportedMetric{}, tx)
require.NoError(t, err)
require.NoError(t, tx.Commit())

syncerDone := make(chan struct{})
go func() {
Expand Down
3 changes: 2 additions & 1 deletion enterprise/reporting/error_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/misc"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
)

Expand Down Expand Up @@ -149,7 +150,7 @@ func (edr *ErrorDetailReporter) GetSyncer(syncerKey string) *types.SyncSource {
return edr.syncers[syncerKey]
}

func (edr *ErrorDetailReporter) Report(metrics []*types.PUReportedMetric, txn *sql.Tx) error {
func (edr *ErrorDetailReporter) Report(metrics []*types.PUReportedMetric, txn *Tx) error {

Check warning on line 153 in enterprise/reporting/error_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_reporting.go#L153

Added line #L153 was not covered by tests
edr.log.Debug("[ErrorDetailReport] Report method called\n")
if len(metrics) == 0 {
return nil
Expand Down
4 changes: 2 additions & 2 deletions enterprise/reporting/mediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package reporting

import (
"context"
"database/sql"

"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
)

Expand Down Expand Up @@ -56,7 +56,7 @@ func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToke
return rm
}

func (rm *Mediator) Report(metrics []*types.PUReportedMetric, txn *sql.Tx) error {
func (rm *Mediator) Report(metrics []*types.PUReportedMetric, txn *Tx) error {
for _, reporter := range rm.reporters {
if err := reporter.Report(metrics, txn); err != nil {
return err
Expand Down
5 changes: 2 additions & 3 deletions enterprise/reporting/noop.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package reporting

import (
"database/sql"

. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
)

// NOOP reporting implementation that does nothing
type NOOP struct{}

func (*NOOP) Report(_ []*types.PUReportedMetric, _ *sql.Tx) error {
func (*NOOP) Report(_ []*types.PUReportedMetric, _ *Tx) error {
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/misc"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
)

Expand Down Expand Up @@ -512,7 +513,7 @@ func transformMetricForPII(metric types.PUReportedMetric, piiColumns []string) t
return metric
}

func (r *DefaultReporter) Report(metrics []*types.PUReportedMetric, txn *sql.Tx) error {
func (r *DefaultReporter) Report(metrics []*types.PUReportedMetric, txn *Tx) error {
if len(metrics) == 0 {
return nil
}
Expand Down
1 change: 1 addition & 0 deletions jobsdb/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/utils/misc"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
)

func (jd *Handle) isBackupEnabled() bool {
Expand Down
1 change: 1 addition & 0 deletions jobsdb/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/testhelper/rand"
"github.com/rudderlabs/rudder-server/jobsdb/prebackup"
fileuploader "github.com/rudderlabs/rudder-server/services/fileuploader"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
)

const (
Expand Down
33 changes: 10 additions & 23 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/rudderlabs/rudder-server/jobsdb/internal/cache"
"github.com/rudderlabs/rudder-server/jobsdb/internal/lock"
"github.com/rudderlabs/rudder-server/jobsdb/prebackup"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/stats"
Expand Down Expand Up @@ -101,29 +102,6 @@ type GetQueryParams struct {
PayloadSizeLimit int64
}

// Tx is a wrapper around sql.Tx that supports registering and executing
// post-commit actions, a.k.a. success listeners.
type Tx struct {
*sql.Tx
successListeners []func()
}

// AddSuccessListener registers a listener to be executed after the transaction has been committed successfully.
func (tx *Tx) AddSuccessListener(listener func()) {
tx.successListeners = append(tx.successListeners, listener)
}

// Commit commits the transaction and executes all listeners.
func (tx *Tx) Commit() error {
err := tx.Tx.Commit()
if err == nil {
for _, successListener := range tx.successListeners {
successListener()
}
}
return err
}

// StoreSafeTx sealed interface
type StoreSafeTx interface {
Tx() *Tx
Expand Down Expand Up @@ -237,6 +215,9 @@ type JobsDB interface {
// that can be used by the provided function.
WithStoreSafeTx(context.Context, func(tx StoreSafeTx) error) error

// WithStoreSafeTxFromTx prepares a store-safe environment for an existing transaction.
WithStoreSafeTxFromTx(ctx context.Context, tx *Tx, f func(tx StoreSafeTx) error) error

// Store stores the provided jobs to the database
Store(ctx context.Context, jobList []*JobT) error

Expand Down Expand Up @@ -1740,6 +1721,12 @@ func (jd *Handle) WithStoreSafeTx(ctx context.Context, f func(tx StoreSafeTx) er
})
}

func (jd *Handle) WithStoreSafeTxFromTx(ctx context.Context, tx *Tx, f func(tx StoreSafeTx) error) error {
return jd.inStoreSafeCtx(ctx, func() error {
return f(&storeSafeTx{tx: tx, identity: jd.tablePrefix})
})
}

func (jd *Handle) inStoreSafeCtx(ctx context.Context, f func() error) error {
// Only locks the list
op := func() error {
Expand Down
1 change: 1 addition & 0 deletions jobsdb/jobsdb_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/testhelper"
"github.com/rudderlabs/rudder-server/testhelper/destination"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
)

func TestBackupTable(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions jobsdb/jobsdb_renameDs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

rsRand "github.com/rudderlabs/rudder-go-kit/testhelper/rand"
"github.com/rudderlabs/rudder-server/jobsdb/prebackup"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
)

func Test_mustRenameDS(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions jobsdb/jobsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/rudderlabs/rudder-server/jobsdb/prebackup"
fileuploader "github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/utils/misc"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
)

var _ = Describe("Calculate newDSIdx for internal migrations", Ordered, func() {
Expand Down
1 change: 1 addition & 0 deletions jobsdb/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/rudderlabs/rudder-server/jobsdb/internal/dsindex"
"github.com/rudderlabs/rudder-server/jobsdb/internal/lock"
"github.com/rudderlabs/rudder-server/utils/misc"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
)

// startMigrateDSLoop migrates jobs from src dataset (srcDS) to destination dataset (dest_ds)
Expand Down
17 changes: 16 additions & 1 deletion mocks/jobsdb/mock_jobsdb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions mocks/utils/types/mock_types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/rudderlabs/rudder-server/services/transientsource"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/misc"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
"github.com/rudderlabs/rudder-server/utils/workerpool"
)
Expand Down Expand Up @@ -2131,7 +2132,7 @@ func (proc *Handle) Store(partition string, in *storeMessage) {
}

if proc.isReportingEnabled() {
if err = proc.reporting.Report(in.reportMetrics, tx.SqlTx()); err != nil {
if err = proc.reporting.Report(in.reportMetrics, tx.Tx()); err != nil {
return fmt.Errorf("reporting metrics: %w", err)
}
}
Expand Down Expand Up @@ -2573,7 +2574,7 @@ func (proc *Handle) transformSrcDest(
}
}

func (proc *Handle) saveDroppedJobs(failedJobs []*jobsdb.JobT, tx *jobsdb.Tx) error {
func (proc *Handle) saveDroppedJobs(failedJobs []*jobsdb.JobT, tx *Tx) error {
if len(failedJobs) > 0 {
rsourcesStats := rsources.NewDroppedJobsCollector(proc.rsourcesService)
rsourcesStats.JobsDropped(failedJobs)
Expand Down
5 changes: 3 additions & 2 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/pubsub"
testutils "github.com/rudderlabs/rudder-server/utils/tests"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
)

Expand Down Expand Up @@ -2024,8 +2025,8 @@ var _ = Describe("Processor", Ordered, func() {
StoreInTx(gomock.Any(), gomock.Any(), gomock.Any()).
AnyTimes()

c.mockWriteProcErrorsDB.EXPECT().WithTx(gomock.Any()).Do(func(f func(tx *jobsdb.Tx) error) {
_ = f(&jobsdb.Tx{})
c.mockWriteProcErrorsDB.EXPECT().WithTx(gomock.Any()).Do(func(f func(tx *Tx) error) {
_ = f(&Tx{})
}).Return(nil).Times(0)

// One Store call is expected for all events
Expand Down
2 changes: 1 addition & 1 deletion router/batchrouter/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ func (brt *Handle) updateJobStatus(batchJobs *BatchedJobs, isWarehouse bool, err
}

if brt.reporting != nil && brt.reportingEnabled {
if err = brt.reporting.Report(reportMetrics, tx.SqlTx()); err != nil {
if err = brt.reporting.Report(reportMetrics, tx.Tx()); err != nil {
return fmt.Errorf("reporting metrics: %w", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions router/batchrouter/handle_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (brt *Handle) updateJobStatuses(ctx context.Context, destinationID string,
}

if brt.reporting != nil && brt.reportingEnabled {
if err = brt.reporting.Report(reportMetrics, tx.SqlTx()); err != nil {
if err = brt.reporting.Report(reportMetrics, tx.Tx()); err != nil {

Check warning on line 61 in router/batchrouter/handle_async.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/handle_async.go#L61

Added line #L61 was not covered by tests
return fmt.Errorf("reporting metrics: %w", err)
}
}
Expand Down Expand Up @@ -688,7 +688,7 @@ func (brt *Handle) setMultipleJobStatus(asyncOutput common.AsyncUploadOutput, at
}

if brt.reporting != nil && brt.reportingEnabled {
if err = brt.reporting.Report(reportMetrics, tx.SqlTx()); err != nil {
if err = brt.reporting.Report(reportMetrics, tx.Tx()); err != nil {

Check warning on line 691 in router/batchrouter/handle_async.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/handle_async.go#L691

Added line #L691 was not covered by tests
return fmt.Errorf("reporting metrics: %w", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion router/batchrouter/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (w *worker) processJobAsync(jobsWg *sync.WaitGroup, destinationJobs *Destin
return fmt.Errorf("marking %s job statuses as aborted: %w", brt.destType, err)
}
if brt.reporting != nil && brt.reportingEnabled {
if err = brt.reporting.Report(reportMetrics, tx.SqlTx()); err != nil {
if err = brt.reporting.Report(reportMetrics, tx.Tx()); err != nil {
return fmt.Errorf("reporting metrics: %w", err)
}
}
Expand Down
Loading

0 comments on commit b90288b

Please sign in to comment.