Skip to content

Commit

Permalink
chore: error index cleanup should not depend on syncers
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Oct 19, 2023
1 parent b90288b commit 83fbdfb
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 53 deletions.
35 changes: 3 additions & 32 deletions enterprise/reporting/error_index_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ import (

"github.com/google/uuid"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/utils/misc"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
)
Expand All @@ -33,21 +31,15 @@ type ErrorIndexReporter struct {
ctx context.Context
log logger.Logger
configSubscriber *configSubscriber
errIndexDB *jobsdb.Handle
errIndexDB jobsdb.JobsDB
now func() time.Time

config struct {
dsLimit misc.ValueLoader[int]
skipMaintenanceError bool
jobRetention time.Duration
}
}

func NewErrorIndexReporter(
ctx context.Context,
conf *config.Config,
log logger.Logger,
configSubscriber *configSubscriber,
errIndexDB jobsdb.JobsDB,
) *ErrorIndexReporter {
eir := &ErrorIndexReporter{
ctx: ctx,
Expand All @@ -56,25 +48,7 @@ func NewErrorIndexReporter(
now: time.Now,
}

eir.config.dsLimit = conf.GetReloadableIntVar(0, 1, "Reporting.errorIndexReporting.dsLimit")
eir.config.skipMaintenanceError = conf.GetBool("Reporting.errorIndexReporting.skipMaintenanceError", false)
eir.config.jobRetention = conf.GetDurationVar(24, time.Hour, "Reporting.errorIndexReporting.jobRetention")

eir.errIndexDB = jobsdb.NewForReadWrite(
"err_idx",
jobsdb.WithDSLimit(eir.config.dsLimit),
jobsdb.WithConfig(conf),
jobsdb.WithSkipMaintenanceErr(eir.config.skipMaintenanceError),
jobsdb.WithJobMaxAge(
func() time.Duration {
return eir.config.jobRetention
},
),
)
if err := eir.errIndexDB.Start(); err != nil {
panic(fmt.Sprintf("failed to start error index db: %v", err))
}

eir.errIndexDB = errIndexDB
return eir
}

Expand Down Expand Up @@ -148,8 +122,5 @@ func (eir *ErrorIndexReporter) DatabaseSyncer(
types.SyncerConfig,
) types.ReportingSyncer {
return func() {
<-eir.ctx.Done()

eir.errIndexDB.Stop()
}
}
20 changes: 9 additions & 11 deletions enterprise/reporting/error_index_reporting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,12 @@ func TestErrorIndexReporter(t *testing.T) {
cs.Subscribe(ctx, mockBackendConfig)
}()

eir := NewErrorIndexReporter(ctx, c, logger.NOP, cs)
errIndexDB := jobsdb.NewForReadWrite("err_idx", jobsdb.WithConfig(c))
require.NoError(t, errIndexDB.Start())
defer errIndexDB.TearDown()
eir := NewErrorIndexReporter(ctx, logger.NOP, cs, errIndexDB)

eir.now = failedAt
defer func() {
eir.errIndexDB.TearDown()
}()
sqltx, err := postgresContainer.DB.Begin()
require.NoError(t, err)
tx := &Tx{Tx: sqltx}
Expand Down Expand Up @@ -303,11 +304,6 @@ func TestErrorIndexReporter(t *testing.T) {
})
}
})
t.Run("panic in case of not able to start errIndexDB", func(t *testing.T) {
require.Panics(t, func() {
_ = NewErrorIndexReporter(ctx, config.New(), logger.NOP, newConfigSubscriber(logger.NOP))
})
})
t.Run("Graceful shutdown", func(t *testing.T) {
postgresContainer, err := resource.SetupPostgres(pool, t)
require.NoError(t, err)
Expand All @@ -329,8 +325,10 @@ func TestErrorIndexReporter(t *testing.T) {
cs.Subscribe(ctx, mockBackendConfig)
}()

eir := NewErrorIndexReporter(ctx, c, logger.NOP, cs)
defer eir.errIndexDB.TearDown()
errIndexDB := jobsdb.NewForReadWrite("err_idx", jobsdb.WithConfig(c))
require.NoError(t, errIndexDB.Start())
defer errIndexDB.TearDown()
eir := NewErrorIndexReporter(ctx, logger.NOP, cs, errIndexDB)
sqltx, err := postgresContainer.DB.Begin()
require.NoError(t, err)
tx := &Tx{Tx: sqltx}
Expand Down
26 changes: 25 additions & 1 deletion enterprise/reporting/mediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package reporting

import (
"context"
"fmt"
"time"

"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
)
Expand Down Expand Up @@ -49,8 +52,29 @@ func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToke

// error index reporting implementation
if config.GetBool("Reporting.errorIndexReporting.enabled", false) {
errorIndexReporter := NewErrorIndexReporter(rm.ctx, config.Default, rm.log, configSubscriber)
conf := config.Default
errIndexDB := jobsdb.NewForReadWrite(
"err_idx",
jobsdb.WithDSLimit(conf.GetReloadableIntVar(0, 1, "Reporting.errorIndexReporting.dsLimit")),
jobsdb.WithConfig(conf),
jobsdb.WithSkipMaintenanceErr(conf.GetBool("Reporting.errorIndexReporting.skipMaintenanceError", false)),
jobsdb.WithJobMaxAge(
func() time.Duration {
return conf.GetDurationVar(24, time.Hour, "Reporting.errorIndexReporting.jobRetention")
},

Check warning on line 64 in enterprise/reporting/mediator.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/mediator.go#L63-L64

Added lines #L63 - L64 were not covered by tests
),
)
if err := errIndexDB.Start(); err != nil {
panic(fmt.Sprintf("failed to start error index db: %v", err))

Check warning on line 68 in enterprise/reporting/mediator.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/mediator.go#L68

Added line #L68 was not covered by tests
}
errorIndexReporter := NewErrorIndexReporter(rm.ctx, rm.log, configSubscriber, errIndexDB)
rm.reporters = append(rm.reporters, errorIndexReporter)
rm.g.Go(func() error {
// Once the context is done, it stops the errorIndex jobsDB
<-rm.ctx.Done()
errIndexDB.Stop()
return nil
})
}

return rm
Expand Down
13 changes: 4 additions & 9 deletions enterprise/reporting/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/rudderlabs/rudder-go-kit/config"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/utils/types"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -124,15 +123,11 @@ func TestSetupForDelegates(t *testing.T) {
}
ctx, cancel := context.WithCancel(context.Background())
med := NewReportingMediator(ctx, logger.NOP, f.EnterpriseToken, &backendconfig.NOOP{})
defer func() {
cancel()
_ = med.g.Wait()
}()
require.Len(t, med.reporters, tc.expectedDelegates)
// TODO: error index reporting jobsdb should start with the syncer
// so that we shouldn't need to start the database syncer here and end it just for cleanup
cancel()
syncer := med.DatabaseSyncer(types.SyncerConfig{
Label: "test",
ConnInfo: postgresContainer.DBDsn,
})
syncer()
})

}
Expand Down

0 comments on commit 83fbdfb

Please sign in to comment.