Skip to content

Commit

Permalink
chore: add Reporting#Stop
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Oct 19, 2023
1 parent 83fbdfb commit c5fc7e9
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 16 deletions.
1 change: 1 addition & 0 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
a.log.Infof("Configured deployment type: %q", deploymentType)

reporting := a.app.Features().Reporting.Setup(ctx, backendconfig.DefaultBackendConfig)
defer reporting.Stop()
syncer := reporting.DatabaseSyncer(types.SyncerConfig{ConnInfo: misc.GetConnectionString(config)})
g.Go(func() error {
syncer()
Expand Down
1 change: 1 addition & 0 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
a.log.Infof("Configured deployment type: %q", deploymentType)

reporting := a.app.Features().Reporting.Setup(ctx, backendconfig.DefaultBackendConfig)
defer reporting.Stop()
syncer := reporting.DatabaseSyncer(types.SyncerConfig{ConnInfo: misc.GetConnectionString(config.Default)})
g.Go(misc.WithBugsnag(func() error {
syncer()
Expand Down
9 changes: 5 additions & 4 deletions enterprise/reporting/error_index_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,11 @@ func (eir *ErrorIndexReporter) Report(metrics []*types.PUReportedMetric, tx *Tx)
return nil
}

// DatabaseSyncer returns a syncer that syncs the errorIndex jobsDB. Once the context is done, it stops the errorIndex jobsDB
func (eir *ErrorIndexReporter) DatabaseSyncer(
types.SyncerConfig,
) types.ReportingSyncer {
func (eir *ErrorIndexReporter) DatabaseSyncer(types.SyncerConfig) types.ReportingSyncer {
return func() {
}
}

func (edr *ErrorIndexReporter) Stop() {
// No op
}
17 changes: 14 additions & 3 deletions enterprise/reporting/error_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ var ErrorDetailReportsColumns = []string{

type ErrorDetailReporter struct {
ctx context.Context
cancel context.CancelFunc
g *errgroup.Group
configSubscriber *configSubscriber
reportingServiceURL string
syncersMu sync.RWMutex
Expand Down Expand Up @@ -98,9 +100,12 @@ func NewErrorDetailReporter(

log := logger.NewLogger().Child("enterprise").Child("error-detail-reporting")
extractor := NewErrorDetailExtractor(log)

ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)
return &ErrorDetailReporter{
ctx: ctx,
cancel: cancel,
g: g,
reportingServiceURL: reportingServiceURL,
log: log,
sleepInterval: sleepInterval,
Expand Down Expand Up @@ -140,7 +145,10 @@ func (edr *ErrorDetailReporter) DatabaseSyncer(c types.SyncerConfig) types.Repor
}

return func() {
edr.mainLoop(edr.ctx, c)
edr.g.Go(func() error {
edr.mainLoop(edr.ctx, c)
return nil
})
}
}

Expand Down Expand Up @@ -572,4 +580,7 @@ func (edr *ErrorDetailReporter) sendMetric(ctx context.Context, label string, me
return err
}

// Sending metrics to Reporting service --- ENDS
func (edr *ErrorDetailReporter) Stop() {
edr.cancel()
_ = edr.g.Wait()
}
17 changes: 15 additions & 2 deletions enterprise/reporting/mediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,22 @@ type Mediator struct {

g *errgroup.Group
ctx context.Context
cancel context.CancelFunc
reporters []types.Reporting
}

func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToken string, backendConfig backendconfig.BackendConfig) *Mediator {
ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)

rm := &Mediator{
log: log,
log: log,
g: g,
ctx: ctx,
cancel: cancel,
}
rm.g, rm.ctx = errgroup.WithContext(ctx)
rm.ctx, rm.cancel = context.WithCancel(ctx)
rm.g, rm.ctx = errgroup.WithContext(rm.ctx)

reportingEnabled := config.GetBool("Reporting.enabled", types.DefaultReportingEnabled)
if enterpriseToken == "" || !reportingEnabled {
Expand Down Expand Up @@ -108,3 +116,8 @@ func (rm *Mediator) DatabaseSyncer(c types.SyncerConfig) types.ReportingSyncer {
_ = rm.g.Wait()
}
}

func (rm *Mediator) Stop() {
rm.cancel()
_ = rm.g.Wait()
}
2 changes: 2 additions & 0 deletions enterprise/reporting/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ func (*NOOP) Report(_ []*types.PUReportedMetric, _ *Tx) error {
func (*NOOP) DatabaseSyncer(c types.SyncerConfig) types.ReportingSyncer {
return func() {}
}

func (*NOOP) Stop() {}
16 changes: 15 additions & 1 deletion enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const (

type DefaultReporter struct {
ctx context.Context
cancel context.CancelFunc
g *errgroup.Group
configSubscriber *configSubscriber
syncersMu sync.RWMutex
syncers map[string]*types.SyncSource
Expand Down Expand Up @@ -85,8 +87,12 @@ func NewDefaultReporter(ctx context.Context, log logger.Logger, configSubscriber
if whActionsOnly {
log.Info("REPORTING_WH_ACTIONS_ONLY enabled.only sending reports relevant to wh actions.")
}
ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)
return &DefaultReporter{
ctx: ctx,
cancel: cancel,
g: g,
log: log,
configSubscriber: configSubscriber,
syncers: make(map[string]*types.SyncSource),
Expand Down Expand Up @@ -136,7 +142,10 @@ func (r *DefaultReporter) DatabaseSyncer(c types.SyncerConfig) types.ReportingSy
return func() {}
}
return func() {
r.mainLoop(r.ctx, c)
r.g.Go(func() error {
r.mainLoop(r.ctx, c)
return nil
})
}
}

Expand Down Expand Up @@ -602,3 +611,8 @@ func (r *DefaultReporter) getTags(label string) stats.Tags {
"clientName": label,
}
}

func (r *DefaultReporter) Stop() {
r.cancel()
_ = r.g.Wait()
}
8 changes: 2 additions & 6 deletions enterprise/reporting/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,8 @@ func TestSetupForDelegates(t *testing.T) {
EnterpriseToken: "dummy-token",
}
}
ctx, cancel := context.WithCancel(context.Background())
med := NewReportingMediator(ctx, logger.NOP, f.EnterpriseToken, &backendconfig.NOOP{})
defer func() {
cancel()
_ = med.g.Wait()
}()
med := NewReportingMediator(context.Background(), logger.NOP, f.EnterpriseToken, &backendconfig.NOOP{})
defer med.Stop()
require.Len(t, med.reporters, tc.expectedDelegates)
})

Expand Down
12 changes: 12 additions & 0 deletions mocks/utils/types/mock_types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions utils/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ type Reporting interface {

// DatabaseSyncer creates reporting tables in the database and returns a function to periodically sync the data
DatabaseSyncer(c SyncerConfig) ReportingSyncer

// Stop the reporting service
Stop()
}

type ReportingSyncer func()
Expand Down
1 change: 1 addition & 0 deletions warehouse/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ func (a *App) Run(ctx context.Context) error {

if !mode.IsStandAloneSlave(a.config.mode) {
a.reporting = a.app.Features().Reporting.Setup(gCtx, a.bcConfig)
defer a.reporting.Stop()
syncer := a.reporting.DatabaseSyncer(types.SyncerConfig{ConnInfo: a.connectionString(), Label: types.WarehouseReportingLabel})
g.Go(misc.WithBugsnagForWarehouse(func() error {
syncer()
Expand Down

0 comments on commit c5fc7e9

Please sign in to comment.