Skip to content

Commit

Permalink
chore: using different jobsdbs for different syncers in error index r…
Browse files Browse the repository at this point in the history
…eporter
  • Loading branch information
atzoum committed Oct 19, 2023
1 parent c5fc7e9 commit d0b3a79
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 66 deletions.
94 changes: 84 additions & 10 deletions enterprise/reporting/error_index_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package reporting

import (
"context"
"database/sql"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/google/uuid"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/jobsdb"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
Expand All @@ -30,25 +33,32 @@ type payload struct {
type ErrorIndexReporter struct {
ctx context.Context
log logger.Logger
conf *config.Config
configSubscriber *configSubscriber
errIndexDB jobsdb.JobsDB
now func() time.Time
dbsMu sync.RWMutex
dbs map[string]*handleWithSqlDB
}

type handleWithSqlDB struct {
*jobsdb.Handle
sqlDB *sql.DB
}

func NewErrorIndexReporter(
ctx context.Context,
log logger.Logger,
configSubscriber *configSubscriber,
errIndexDB jobsdb.JobsDB,
conf *config.Config,
) *ErrorIndexReporter {
eir := &ErrorIndexReporter{
ctx: ctx,
log: log,
conf: conf,
configSubscriber: configSubscriber,
now: time.Now,
dbs: map[string]*handleWithSqlDB{},
}

eir.errIndexDB = errIndexDB
return eir
}

Expand Down Expand Up @@ -107,21 +117,85 @@ func (eir *ErrorIndexReporter) Report(metrics []*types.PUReportedMetric, tx *Tx)
if len(jobs) == 0 {
return nil
}

if err := eir.errIndexDB.WithStoreSafeTxFromTx(eir.ctx, tx, func(tx jobsdb.StoreSafeTx) error {
return eir.errIndexDB.StoreInTx(eir.ctx, tx, jobs)
db, err := eir.resolveJobsDB(tx)
if err != nil {
return fmt.Errorf("failed to resolve jobsdb: %w", err)
}
if err := db.WithStoreSafeTxFromTx(eir.ctx, tx, func(tx jobsdb.StoreSafeTx) error {
return db.StoreInTx(eir.ctx, tx, jobs)
}); err != nil {
return fmt.Errorf("failed to store jobs: %v", err)
}

return nil
}

func (eir *ErrorIndexReporter) DatabaseSyncer(types.SyncerConfig) types.ReportingSyncer {
func (eir *ErrorIndexReporter) DatabaseSyncer(c types.SyncerConfig) types.ReportingSyncer {
eir.dbsMu.Lock()
defer eir.dbsMu.Unlock()
if _, ok := eir.dbs[c.ConnInfo]; !ok {
dbHandle, err := sql.Open("postgres", c.ConnInfo)
if err != nil {
panic(fmt.Errorf("failed to open error index db: %w", err))

Check warning on line 139 in enterprise/reporting/error_index_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_index_reporting.go#L139

Added line #L139 was not covered by tests
}
errIndexDB := jobsdb.NewForReadWrite(
"err_idx",
jobsdb.WithDBHandle(dbHandle),
jobsdb.WithDSLimit(eir.conf.GetReloadableIntVar(0, 1, "Reporting.errorIndexReporting.dsLimit")),
jobsdb.WithConfig(eir.conf),
jobsdb.WithSkipMaintenanceErr(eir.conf.GetBool("Reporting.errorIndexReporting.skipMaintenanceError", false)),
jobsdb.WithJobMaxAge(
func() time.Duration {
return eir.conf.GetDurationVar(24, time.Hour, "Reporting.errorIndexReporting.jobRetention")
},

Check warning on line 150 in enterprise/reporting/error_index_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_index_reporting.go#L149-L150

Added lines #L149 - L150 were not covered by tests
),
)
if err := errIndexDB.Start(); err != nil {
panic(fmt.Errorf("failed to start error index db: %w", err))

Check warning on line 154 in enterprise/reporting/error_index_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_index_reporting.go#L154

Added line #L154 was not covered by tests
}
eir.dbs[c.ConnInfo] = &handleWithSqlDB{
Handle: errIndexDB,
sqlDB: dbHandle,
}
}
return func() {
}
}

func (edr *ErrorIndexReporter) Stop() {
// No op
func (eir *ErrorIndexReporter) Stop() {
eir.dbsMu.RLock()
defer eir.dbsMu.RUnlock()
for _, db := range eir.dbs {
db.Handle.Stop()
}
}

// resolveJobsDB returns the jobsdb that matches the current transaction (using system information functions)
// https://www.postgresql.org/docs/11/functions-info.html
func (eir *ErrorIndexReporter) resolveJobsDB(tx *Tx) (jobsdb.JobsDB, error) {
eir.dbsMu.RLock()
defer eir.dbsMu.RUnlock()

if len(eir.dbs) == 1 { // optimisation, if there is only one jobsdb, return this. If it is the wrong one, it will fail anyway
for i := range eir.dbs {
return eir.dbs[i].Handle, nil
}
}

dbIdentityQuery := `select inet_server_addr()::text || ':' || inet_server_port()::text || ':' || current_user || ':' || current_database() || ':' || current_schema || ':' || pg_postmaster_start_time()::text || ':' || version()`
var txDatabaseIdentity string
if err := tx.QueryRow(dbIdentityQuery).Scan(&txDatabaseIdentity); err != nil {
return nil, fmt.Errorf("failed to get current tx's db identity: %w", err)
}

Check warning on line 189 in enterprise/reporting/error_index_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_index_reporting.go#L188-L189

Added lines #L188 - L189 were not covered by tests

for key := range eir.dbs {
var databaseIdentity string
if err := eir.dbs[key].sqlDB.QueryRow(dbIdentityQuery).Scan(&databaseIdentity); err != nil {
return nil, fmt.Errorf("failed to get db identity for %q: %w", key, err)
}

Check warning on line 195 in enterprise/reporting/error_index_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_index_reporting.go#L194-L195

Added lines #L194 - L195 were not covered by tests
if databaseIdentity == txDatabaseIdentity {
return eir.dbs[key].Handle, nil
}
}
return nil, fmt.Errorf("no jobsdb found matching the current transaction")
}
192 changes: 164 additions & 28 deletions enterprise/reporting/error_index_reporting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,25 +244,17 @@ func TestErrorIndexReporter(t *testing.T) {
require.NoError(t, err)

c := config.New()
c.Set("DB.port", postgresContainer.Port)
c.Set("DB.user", postgresContainer.User)
c.Set("DB.name", postgresContainer.Database)
c.Set("DB.password", postgresContainer.Password)

ctx, cancel := context.WithCancel(ctx)

cs := newConfigSubscriber(logger.NOP)

subscribeDone := make(chan struct{})
go func() {
defer close(subscribeDone)
cs.Subscribe(ctx, mockBackendConfig)
}()

errIndexDB := jobsdb.NewForReadWrite("err_idx", jobsdb.WithConfig(c))
require.NoError(t, errIndexDB.Start())
defer errIndexDB.TearDown()
eir := NewErrorIndexReporter(ctx, logger.NOP, cs, errIndexDB)
eir := NewErrorIndexReporter(ctx, logger.NOP, cs, c)
_ = eir.DatabaseSyncer(types.SyncerConfig{ConnInfo: postgresContainer.DBDsn})
defer eir.Stop()

eir.now = failedAt
sqltx, err := postgresContainer.DB.Begin()
Expand All @@ -271,7 +263,9 @@ func TestErrorIndexReporter(t *testing.T) {
err = eir.Report(tc.reports, tx)
require.NoError(t, err)
require.NoError(t, tx.Commit())
jr, err := eir.errIndexDB.GetUnprocessed(ctx, jobsdb.GetQueryParams{
db, err := eir.resolveJobsDB(tx)
require.NoError(t, err)
jr, err := db.GetUnprocessed(ctx, jobsdb.GetQueryParams{
JobsLimit: 100,
})
require.NoError(t, err)
Expand Down Expand Up @@ -304,31 +298,23 @@ func TestErrorIndexReporter(t *testing.T) {
})
}
})
t.Run("Graceful shutdown", func(t *testing.T) {
t.Run("graceful shutdown", func(t *testing.T) {
postgresContainer, err := resource.SetupPostgres(pool, t)
require.NoError(t, err)

c := config.New()
c.Set("DB.port", postgresContainer.Port)
c.Set("DB.user", postgresContainer.User)
c.Set("DB.name", postgresContainer.Database)
c.Set("DB.password", postgresContainer.Password)

ctx, cancel := context.WithCancel(ctx)

cs := newConfigSubscriber(logger.NOP)

subscribeDone := make(chan struct{})
go func() {
defer close(subscribeDone)

cs.Subscribe(ctx, mockBackendConfig)
}()

errIndexDB := jobsdb.NewForReadWrite("err_idx", jobsdb.WithConfig(c))
require.NoError(t, errIndexDB.Start())
defer errIndexDB.TearDown()
eir := NewErrorIndexReporter(ctx, logger.NOP, cs, errIndexDB)
eir := NewErrorIndexReporter(ctx, logger.NOP, cs, c)
defer eir.Stop()
syncer := eir.DatabaseSyncer(types.SyncerConfig{ConnInfo: postgresContainer.DBDsn})

sqltx, err := postgresContainer.DB.Begin()
require.NoError(t, err)
tx := &Tx{Tx: sqltx}
Expand All @@ -339,14 +325,164 @@ func TestErrorIndexReporter(t *testing.T) {
syncerDone := make(chan struct{})
go func() {
defer close(syncerDone)

syncer := eir.DatabaseSyncer(types.SyncerConfig{})
syncer()
}()

cancel()

<-subscribeDone
<-syncerDone
})

t.Run("using 1 syncer", func(t *testing.T) {
t.Run("wrong transaction", func(t *testing.T) {
pg1, err := resource.SetupPostgres(pool, t)
require.NoError(t, err)
pg2, err := resource.SetupPostgres(pool, t)
require.NoError(t, err)

c := config.New()
ctx, cancel := context.WithCancel(ctx)
cs := newConfigSubscriber(logger.NOP)
subscribeDone := make(chan struct{})
go func() {
defer close(subscribeDone)
cs.Subscribe(ctx, mockBackendConfig)
}()

eir := NewErrorIndexReporter(ctx, logger.NOP, cs, c)
defer eir.Stop()
_ = eir.DatabaseSyncer(types.SyncerConfig{ConnInfo: pg1.DBDsn})

sqltx, err := pg2.DB.Begin()
require.NoError(t, err)
tx := &Tx{Tx: sqltx}
err = eir.Report([]*types.PUReportedMetric{
{
ConnectionDetails: types.ConnectionDetails{
SourceID: sourceID,
DestinationID: destinationID,
TransformationID: transformationID,
TrackingPlanID: trackingPlanID,
},
PUDetails: types.PUDetails{
PU: reportedBy,
},
StatusDetail: &types.StatusDetail{
EventName: eventName,
EventType: eventType,
FailedMessages: []*types.FailedMessage{
{
MessageID: messageID + "1",
ReceivedAt: receivedAt.Add(1 * time.Hour),
},
{
MessageID: messageID + "2",
ReceivedAt: receivedAt.Add(2 * time.Hour),
},
},
},
},
}, tx)
require.Error(t, err)
require.Error(t, tx.Commit())

cancel()
<-subscribeDone
})
})

t.Run("using 2 syncers", func(t *testing.T) {
pg1, err := resource.SetupPostgres(pool, t)
require.NoError(t, err)
pg2, err := resource.SetupPostgres(pool, t)
require.NoError(t, err)
pg3, err := resource.SetupPostgres(pool, t)
require.NoError(t, err)

c := config.New()
ctx, cancel := context.WithCancel(ctx)
cs := newConfigSubscriber(logger.NOP)
subscribeDone := make(chan struct{})
go func() {
defer close(subscribeDone)
cs.Subscribe(ctx, mockBackendConfig)
}()

eir := NewErrorIndexReporter(ctx, logger.NOP, cs, c)
defer eir.Stop()
_ = eir.DatabaseSyncer(types.SyncerConfig{ConnInfo: pg1.DBDsn})
_ = eir.DatabaseSyncer(types.SyncerConfig{ConnInfo: pg2.DBDsn})

t.Run("correct transaction", func(t *testing.T) {
sqltx, err := pg1.DB.Begin()
require.NoError(t, err)
tx := &Tx{Tx: sqltx}
err = eir.Report([]*types.PUReportedMetric{
{
ConnectionDetails: types.ConnectionDetails{
SourceID: sourceID,
DestinationID: destinationID,
TransformationID: transformationID,
TrackingPlanID: trackingPlanID,
},
PUDetails: types.PUDetails{
PU: reportedBy,
},
StatusDetail: &types.StatusDetail{
EventName: eventName,
EventType: eventType,
FailedMessages: []*types.FailedMessage{
{
MessageID: messageID + "1",
ReceivedAt: receivedAt.Add(1 * time.Hour),
},
{
MessageID: messageID + "2",
ReceivedAt: receivedAt.Add(2 * time.Hour),
},
},
},
},
}, tx)
require.NoError(t, err)
require.NoError(t, tx.Commit())
})
t.Run("wrong transaction", func(t *testing.T) {
sqltx, err := pg3.DB.Begin()
require.NoError(t, err)
tx := &Tx{Tx: sqltx}
err = eir.Report([]*types.PUReportedMetric{
{
ConnectionDetails: types.ConnectionDetails{
SourceID: sourceID,
DestinationID: destinationID,
TransformationID: transformationID,
TrackingPlanID: trackingPlanID,
},
PUDetails: types.PUDetails{
PU: reportedBy,
},
StatusDetail: &types.StatusDetail{
EventName: eventName,
EventType: eventType,
FailedMessages: []*types.FailedMessage{
{
MessageID: messageID + "1",
ReceivedAt: receivedAt.Add(1 * time.Hour),
},
{
MessageID: messageID + "2",
ReceivedAt: receivedAt.Add(2 * time.Hour),
},
},
},
},
}, tx)
require.Error(t, err)
require.NoError(t, tx.Commit())
})

cancel()
<-subscribeDone
})
}
Loading

0 comments on commit d0b3a79

Please sign in to comment.