Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: error index reporter improvements #3994

Merged
merged 5 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
a.log.Infof("Configured deployment type: %q", deploymentType)

reporting := a.app.Features().Reporting.Setup(ctx, backendconfig.DefaultBackendConfig)
defer reporting.Stop()
syncer := reporting.DatabaseSyncer(types.SyncerConfig{ConnInfo: misc.GetConnectionString(config)})
g.Go(func() error {
syncer()
Expand Down
1 change: 1 addition & 0 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
a.log.Infof("Configured deployment type: %q", deploymentType)

reporting := a.app.Features().Reporting.Setup(ctx, backendconfig.DefaultBackendConfig)
defer reporting.Stop()

Check warning on line 110 in app/apphandlers/processorAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/processorAppHandler.go#L110

Added line #L110 was not covered by tests
syncer := reporting.DatabaseSyncer(types.SyncerConfig{ConnInfo: misc.GetConnectionString(config.Default)})
g.Go(misc.WithBugsnag(func() error {
syncer()
Expand Down
124 changes: 86 additions & 38 deletions enterprise/reporting/error_index_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
"database/sql"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/google/uuid"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/utils/misc"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
)

Expand All @@ -32,54 +33,37 @@
type ErrorIndexReporter struct {
ctx context.Context
log logger.Logger
conf *config.Config
configSubscriber *configSubscriber
errIndexDB *jobsdb.Handle
now func() time.Time
dbsMu sync.RWMutex
dbs map[string]*handleWithSqlDB
}

config struct {
dsLimit misc.ValueLoader[int]
skipMaintenanceError bool
jobRetention time.Duration
}
type handleWithSqlDB struct {
*jobsdb.Handle
sqlDB *sql.DB
}

func NewErrorIndexReporter(
ctx context.Context,
conf *config.Config,
log logger.Logger,
configSubscriber *configSubscriber,
conf *config.Config,
) *ErrorIndexReporter {
eir := &ErrorIndexReporter{
ctx: ctx,
log: log,
conf: conf,
configSubscriber: configSubscriber,
now: time.Now,
dbs: map[string]*handleWithSqlDB{},
}

eir.config.dsLimit = conf.GetReloadableIntVar(0, 1, "Reporting.errorIndexReporting.dsLimit")
eir.config.skipMaintenanceError = conf.GetBool("Reporting.errorIndexReporting.skipMaintenanceError", false)
eir.config.jobRetention = conf.GetDurationVar(24, time.Hour, "Reporting.errorIndexReporting.jobRetention")

eir.errIndexDB = jobsdb.NewForReadWrite(
"err_idx",
jobsdb.WithDSLimit(eir.config.dsLimit),
jobsdb.WithConfig(conf),
jobsdb.WithSkipMaintenanceErr(eir.config.skipMaintenanceError),
jobsdb.WithJobMaxAge(
func() time.Duration {
return eir.config.jobRetention
},
),
)
if err := eir.errIndexDB.Start(); err != nil {
panic(fmt.Sprintf("failed to start error index db: %v", err))
}

return eir
}

// Report reports the metrics to the errorIndex JobsDB
func (eir *ErrorIndexReporter) Report(metrics []*types.PUReportedMetric, _ *sql.Tx) error {
func (eir *ErrorIndexReporter) Report(metrics []*types.PUReportedMetric, tx *Tx) error {
failedAt := eir.now()

var jobs []*jobsdb.JobT
Expand Down Expand Up @@ -133,21 +117,85 @@
if len(jobs) == 0 {
return nil
}

if err := eir.errIndexDB.Store(eir.ctx, jobs); err != nil {
return fmt.Errorf("failed to store jobs: %v", err)
db, err := eir.resolveJobsDB(tx)
if err != nil {
return fmt.Errorf("failed to resolve jobsdb: %w", err)
}
if err := db.WithStoreSafeTxFromTx(eir.ctx, tx, func(tx jobsdb.StoreSafeTx) error {
return db.StoreInTx(eir.ctx, tx, jobs)
}); err != nil {
return fmt.Errorf("failed to store jobs: %w", err)
}

return nil
}

// DatabaseSyncer returns a syncer that syncs the errorIndex jobsDB. Once the context is done, it stops the errorIndex jobsDB
func (eir *ErrorIndexReporter) DatabaseSyncer(
types.SyncerConfig,
) types.ReportingSyncer {
func (eir *ErrorIndexReporter) DatabaseSyncer(c types.SyncerConfig) types.ReportingSyncer {
eir.dbsMu.Lock()
defer eir.dbsMu.Unlock()
if _, ok := eir.dbs[c.ConnInfo]; !ok {
dbHandle, err := sql.Open("postgres", c.ConnInfo)
if err != nil {
panic(fmt.Errorf("failed to open error index db: %w", err))

Check warning on line 139 in enterprise/reporting/error_index_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_index_reporting.go#L139

Added line #L139 was not covered by tests
}
errIndexDB := jobsdb.NewForReadWrite(
"err_idx",
jobsdb.WithDBHandle(dbHandle),
jobsdb.WithDSLimit(eir.conf.GetReloadableIntVar(0, 1, "Reporting.errorIndexReporting.dsLimit")),
jobsdb.WithConfig(eir.conf),
jobsdb.WithSkipMaintenanceErr(eir.conf.GetBool("Reporting.errorIndexReporting.skipMaintenanceError", false)),
jobsdb.WithJobMaxAge(
func() time.Duration {
return eir.conf.GetDurationVar(24, time.Hour, "Reporting.errorIndexReporting.jobRetention")
},

Check warning on line 150 in enterprise/reporting/error_index_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_index_reporting.go#L149-L150

Added lines #L149 - L150 were not covered by tests
),
)
if err := errIndexDB.Start(); err != nil {
panic(fmt.Errorf("failed to start error index db: %w", err))

Check warning on line 154 in enterprise/reporting/error_index_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_index_reporting.go#L154

Added line #L154 was not covered by tests
}
eir.dbs[c.ConnInfo] = &handleWithSqlDB{
Handle: errIndexDB,
sqlDB: dbHandle,
}
}
return func() {
<-eir.ctx.Done()
}
}

eir.errIndexDB.Stop()
func (eir *ErrorIndexReporter) Stop() {
eir.dbsMu.RLock()
defer eir.dbsMu.RUnlock()
for _, db := range eir.dbs {
db.Handle.Stop()
}
}

// resolveJobsDB returns the jobsdb that matches the current transaction (using system information functions)
// https://www.postgresql.org/docs/11/functions-info.html
func (eir *ErrorIndexReporter) resolveJobsDB(tx *Tx) (jobsdb.JobsDB, error) {
eir.dbsMu.RLock()
defer eir.dbsMu.RUnlock()

if len(eir.dbs) == 1 { // optimisation, if there is only one jobsdb, return this. If it is the wrong one, it will fail anyway
for i := range eir.dbs {
return eir.dbs[i].Handle, nil
}
}

dbIdentityQuery := `select inet_server_addr()::text || ':' || inet_server_port()::text || ':' || current_user || ':' || current_database() || ':' || current_schema || ':' || pg_postmaster_start_time()::text || ':' || version()`
var txDatabaseIdentity string
if err := tx.QueryRow(dbIdentityQuery).Scan(&txDatabaseIdentity); err != nil {
return nil, fmt.Errorf("failed to get current tx's db identity: %w", err)
}

Check warning on line 189 in enterprise/reporting/error_index_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_index_reporting.go#L188-L189

Added lines #L188 - L189 were not covered by tests

for key := range eir.dbs {
var databaseIdentity string
if err := eir.dbs[key].sqlDB.QueryRow(dbIdentityQuery).Scan(&databaseIdentity); err != nil {
return nil, fmt.Errorf("failed to get db identity for %q: %w", key, err)
}

Check warning on line 195 in enterprise/reporting/error_index_reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/error_index_reporting.go#L194-L195

Added lines #L194 - L195 were not covered by tests
if databaseIdentity == txDatabaseIdentity {
return eir.dbs[key].Handle, nil
}
}
return nil, fmt.Errorf("no jobsdb found matching the current transaction")
}
Loading
Loading