Skip to content

Commit

Permalink
fix(jobsdb): update cache after transaction completes (#2567)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Oct 14, 2022
1 parent 3cb2ec6 commit 2d70da7
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 102 deletions.
2 changes: 1 addition & 1 deletion gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (gateway *HandleT) dbWriterWorkerProcess() {
// rsources stats
rsourcesStats := rsources.NewStatsCollector(gateway.rsourcesService)
rsourcesStats.JobsStoredWithErrors(jobList, errorMessagesMap)
return rsourcesStats.Publish(ctx, tx.Tx())
return rsourcesStats.Publish(ctx, tx.SqlTx())
})
cancel()
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions jobsdb/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package jobsdb

import (
"context"
"database/sql"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -1391,7 +1390,7 @@ func consume(t testing.TB, db *HandleT, count int) {
func getPayloadSize(t *testing.T, jobsDB JobsDB, job *JobT) (int64, error) {
var size int64
var tables []string
err := jobsDB.WithTx(func(tx *sql.Tx) error {
err := jobsDB.WithTx(func(tx *Tx) error {
rows, err := tx.Query(fmt.Sprintf("SELECT tablename FROM pg_catalog.pg_tables where tablename like '%s_jobs_%%'", jobsDB.Identifier()))
require.NoError(t, err)
for rows.Next() {
Expand Down
178 changes: 109 additions & 69 deletions jobsdb/jobsdb.go

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions jobsdb/jobsdb_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bufio"
"compress/gzip"
"context"
"database/sql"
"io"
"os"
"strings"
Expand Down Expand Up @@ -177,7 +176,7 @@ func (*backupTestCase) insertRTData(t *testing.T, jobs []*JobT, statusList []*Jo
require.NoError(t, err)

rtDS := newDataSet("rt", "1")
err = jobsDB.WithTx(func(tx *sql.Tx) error {
err = jobsDB.WithTx(func(tx *Tx) error {
if err := jobsDB.copyJobsDS(tx, rtDS, jobs); err != nil {
return err
}
Expand All @@ -190,7 +189,7 @@ func (*backupTestCase) insertRTData(t *testing.T, jobs []*JobT, statusList []*Jo
jobsDB.dsListLock.WithLock(func(l lock.LockToken) {
jobsDB.addNewDS(l, rtDS2)
})
err = jobsDB.WithTx(func(tx *sql.Tx) error {
err = jobsDB.WithTx(func(tx *Tx) error {
if err := jobsDB.copyJobsDS(tx, rtDS2, jobs); err != nil {
return err
}
Expand All @@ -214,7 +213,7 @@ func (*backupTestCase) insertBatchRTData(t *testing.T, jobs []*JobT, statusList
require.NoError(t, err)

ds := newDataSet("batch_rt", "1")
err = jobsDB.WithTx(func(tx *sql.Tx) error {
err = jobsDB.WithTx(func(tx *Tx) error {
if err := jobsDB.copyJobsDS(tx, ds, jobs); err != nil {
t.Log("error while copying jobs to ds: ", err)
return err
Expand All @@ -227,7 +226,7 @@ func (*backupTestCase) insertBatchRTData(t *testing.T, jobs []*JobT, statusList
jobsDB.dsListLock.WithLock(func(l lock.LockToken) {
jobsDB.addNewDS(l, ds2)
})
err = jobsDB.WithTx(func(tx *sql.Tx) error {
err = jobsDB.WithTx(func(tx *Tx) error {
if err := jobsDB.copyJobsDS(tx, ds2, jobs); err != nil {
t.Log("error while copying jobs to ds: ", err)
return err
Expand Down
3 changes: 1 addition & 2 deletions jobsdb/jobsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package jobsdb
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -394,7 +393,7 @@ func TestRefreshDSList(t *testing.T) {
defer jobsDB.TearDown()

require.Equal(t, 1, len(jobsDB.getDSList()), "jobsDB should start with a ds list size of 1")
require.NoError(t, jobsDB.WithTx(func(tx *sql.Tx) error {
require.NoError(t, jobsDB.WithTx(func(tx *Tx) error {
return jobsDB.addDSInTx(tx, newDataSet(prefix, "2"))
}))
require.Equal(t, 1, len(jobsDB.getDSList()), "addDS should not refresh the ds list")
Expand Down
3 changes: 1 addition & 2 deletions mocks/jobsdb/mock_jobsdb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 6 additions & 7 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package processor
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -1681,13 +1680,13 @@ func (proc *HandleT) Store(in *storeMessage) {

// rsources stats
in.rsourcesStats.JobStatusesUpdated(statusList)
err = in.rsourcesStats.Publish(ctx, tx.Tx())
err = in.rsourcesStats.Publish(ctx, tx.SqlTx())
if err != nil {
return fmt.Errorf("publishing rsources stats: %w", err)
}

if proc.isReportingEnabled() {
proc.reporting.Report(in.reportMetrics, tx.Tx())
proc.reporting.Report(in.reportMetrics, tx.SqlTx())
}

if enableDedup {
Expand Down Expand Up @@ -2111,10 +2110,10 @@ func (proc *HandleT) saveFailedJobs(failedJobs []*jobsdb.JobT) {

rsourcesStats := rsources.NewFailedJobsCollector(proc.rsourcesService)
rsourcesStats.JobsFailed(failedJobs)
_ = proc.errorDB.WithTx(func(tx *sql.Tx) error {
_ = proc.errorDB.WithTx(func(tx *jobsdb.Tx) error {
// TODO: error propagation
router.GetFailedEventsManager().SaveFailedRecordIDs(jobRunIDAbortedEventsMap, tx)
return rsourcesStats.Publish(context.TODO(), tx)
router.GetFailedEventsManager().SaveFailedRecordIDs(jobRunIDAbortedEventsMap, tx.Tx)
return rsourcesStats.Publish(context.TODO(), tx.Tx)
})

}
Expand Down Expand Up @@ -2587,6 +2586,6 @@ func (proc *HandleT) isReportingEnabled() bool {
func (proc *HandleT) updateRudderSourcesStats(ctx context.Context, tx jobsdb.StoreSafeTx, jobs []*jobsdb.JobT) error {
rsourcesStats := rsources.NewStatsCollector(proc.rsourcesService)
rsourcesStats.JobsStored(jobs)
err := rsourcesStats.Publish(ctx, tx.Tx())
err := rsourcesStats.Publish(ctx, tx.SqlTx())
return err
}
13 changes: 6 additions & 7 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package processor

import (
"context"
"database/sql"
"encoding/json"
"fmt"
"reflect"
Expand Down Expand Up @@ -973,8 +972,8 @@ var _ = Describe("Processor", func() {
})

// will be used to save failed events to failed keys table
c.mockProcErrorsDB.EXPECT().WithTx(gomock.Any()).Do(func(f func(tx *sql.Tx) error) {
_ = f(nil)
c.mockProcErrorsDB.EXPECT().WithTx(gomock.Any()).Do(func(f func(tx *jobsdb.Tx) error) {
_ = f(&jobsdb.Tx{})
}).Times(1)

// One Store call is expected for all events
Expand Down Expand Up @@ -1105,8 +1104,8 @@ var _ = Describe("Processor", func() {
assertJobStatus(unprocessedJobsList[0], statuses[0], jobsdb.Succeeded.State)
})

c.mockProcErrorsDB.EXPECT().WithTx(gomock.Any()).Do(func(f func(tx *sql.Tx) error) {
_ = f(nil)
c.mockProcErrorsDB.EXPECT().WithTx(gomock.Any()).Do(func(f func(tx *jobsdb.Tx) error) {
_ = f(&jobsdb.Tx{})
}).Return(nil).Times(1)

// One Store call is expected for all events
Expand Down Expand Up @@ -1183,8 +1182,8 @@ var _ = Describe("Processor", func() {
assertJobStatus(unprocessedJobsList[0], statuses[0], jobsdb.Succeeded.State)
})

c.mockProcErrorsDB.EXPECT().WithTx(gomock.Any()).Do(func(f func(tx *sql.Tx) error) {
_ = f(nil)
c.mockProcErrorsDB.EXPECT().WithTx(gomock.Any()).Do(func(f func(tx *jobsdb.Tx) error) {
_ = f(&jobsdb.Tx{})
}).Return(nil).Times(0)

// One Store call is expected for all events
Expand Down
8 changes: 4 additions & 4 deletions router/batchrouter/batchrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1384,10 +1384,10 @@ func (brt *HandleT) setJobStatus(batchJobs *BatchJobsT, isWarehouse bool, errOcc

// Save msgids of aborted jobs
if len(jobRunIDAbortedEventsMap) > 0 {
router.GetFailedEventsManager().SaveFailedRecordIDs(jobRunIDAbortedEventsMap, tx.Tx())
router.GetFailedEventsManager().SaveFailedRecordIDs(jobRunIDAbortedEventsMap, tx.SqlTx())
}
if brt.reporting != nil && brt.reportingEnabled {
brt.reporting.Report(reportMetrics, tx.Tx())
brt.reporting.Report(reportMetrics, tx.SqlTx())
}
return nil
})
Expand Down Expand Up @@ -1495,7 +1495,7 @@ func (brt *HandleT) setMultipleJobStatus(asyncOutput asyncdestinationmanager.Asy
}
// rsources stats
rsourcesStats.JobStatusesUpdated(statusList)
err = rsourcesStats.Publish(context.TODO(), tx.Tx())
err = rsourcesStats.Publish(context.TODO(), tx.SqlTx())
if err != nil {
brt.logger.Errorf("publishing rsources stats: %w", err)
}
Expand Down Expand Up @@ -2406,7 +2406,7 @@ func (brt *HandleT) updateRudderSourcesStats(ctx context.Context, tx jobsdb.Upda
rsourcesStats := rsources.NewStatsCollector(brt.rsourcesService)
rsourcesStats.BeginProcessing(jobs)
rsourcesStats.JobStatusesUpdated(jobStatuses)
err := rsourcesStats.Publish(ctx, tx.Tx())
err := rsourcesStats.Publish(ctx, tx.SqlTx())
if err != nil {
return fmt.Errorf("publishing rsources stats: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -1536,9 +1536,9 @@ func (rt *HandleT) commitStatusList(responseList *[]jobResponseT) {

// Save msgids of aborted jobs
if len(jobRunIDAbortedEventsMap) > 0 {
GetFailedEventsManager().SaveFailedRecordIDs(jobRunIDAbortedEventsMap, tx.Tx())
GetFailedEventsManager().SaveFailedRecordIDs(jobRunIDAbortedEventsMap, tx.SqlTx())
}
rt.Reporting.Report(reportMetrics, tx.Tx())
rt.Reporting.Report(reportMetrics, tx.SqlTx())
return nil
})
}, sendRetryStoreStats)
Expand Down Expand Up @@ -2219,7 +2219,7 @@ func (rt *HandleT) updateRudderSourcesStats(ctx context.Context, tx jobsdb.Updat
rsourcesStats := rsources.NewStatsCollector(rt.rsourcesService)
rsourcesStats.BeginProcessing(jobs)
rsourcesStats.JobStatusesUpdated(jobStatuses)
err := rsourcesStats.Publish(ctx, tx.Tx())
err := rsourcesStats.Publish(ctx, tx.SqlTx())
if err != nil {
rt.logger.Errorf("publishing rsources stats: %w", err)
}
Expand Down

0 comments on commit 2d70da7

Please sign in to comment.