Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: track event delivery stats #3974

Merged
merged 38 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
7fc3394
feat: track event delivery stats
gane5hvarma Oct 12, 2023
a9bc0b0
change package name
gane5hvarma Oct 12, 2023
98e2a1f
resolve merge conflicts
gane5hvarma Oct 13, 2023
a30f696
add basic test to check metrics
gane5hvarma Oct 14, 2023
1708089
rm fmt
gane5hvarma Oct 14, 2023
a64b907
make fmt
gane5hvarma Oct 14, 2023
58439a5
make fmt
gane5hvarma Oct 14, 2023
a99bc70
fix setup test
gane5hvarma Oct 14, 2023
14c097a
Merge branch 'master' into feat.unifiedPipelineMetrics
gane5hvarma Oct 17, 2023
1118a95
Merge branch 'master' into feat.unifiedPipelineMetrics
gane5hvarma Oct 18, 2023
ba2ba50
Merge branch 'master' into feat.unifiedPipelineMetrics
fracasula Oct 18, 2023
14932f9
Merge branch 'master' into feat.unifiedPipelineMetrics
gane5hvarma Oct 18, 2023
a4053c9
Merge branch 'master' into feat.unifiedPipelineMetrics
gane5hvarma Oct 18, 2023
4e2fde9
Merge branch 'master' into feat.unifiedPipelineMetrics
gane5hvarma Oct 19, 2023
b0f9a4b
add tx listener to caputre event stats
gane5hvarma Oct 30, 2023
91a9275
Merge branch 'feat.unifiedPipelineMetrics' of github.com:rudderlabs/r…
gane5hvarma Oct 30, 2023
2c633fe
remove unused imports
gane5hvarma Oct 30, 2023
f785db0
fmt file
gane5hvarma Oct 31, 2023
1c7ecbc
fix tests in reporting
gane5hvarma Oct 31, 2023
d0ba83b
Merge branch 'master' into feat.unifiedPipelineMetrics
gane5hvarma Oct 31, 2023
736e784
Merge branch 'master' into feat.unifiedPipelineMetrics
gane5hvarma Nov 1, 2023
8dcef04
resolve pr comments
gane5hvarma Nov 1, 2023
259adc8
Merge branch 'feat.unifiedPipelineMetrics' of github.com:rudderlabs/r…
gane5hvarma Nov 1, 2023
5bde9ad
resolve pr comments
gane5hvarma Nov 8, 2023
7129cfb
Merge branch 'master' into feat.unifiedPipelineMetrics
gane5hvarma Nov 8, 2023
68232c4
fix count in aborted test
gane5hvarma Nov 8, 2023
04e0934
Merge branch 'feat.unifiedPipelineMetrics' of github.com:rudderlabs/r…
gane5hvarma Nov 8, 2023
582628f
fix database syncer return
gane5hvarma Nov 8, 2023
08bb132
fix delegate setup test
gane5hvarma Nov 8, 2023
4564996
add status as label and capture only one metric
gane5hvarma Nov 8, 2023
1a4a5bb
Merge branch 'master' into feat.unifiedPipelineMetrics
gane5hvarma Nov 8, 2023
b9c99a5
remove terminal in labels and only capture terminal status except mig…
gane5hvarma Nov 9, 2023
fbd3b9d
Merge branch 'feat.unifiedPipelineMetrics' of github.com:rudderlabs/r…
gane5hvarma Nov 9, 2023
0619473
fix test in event stats
gane5hvarma Nov 9, 2023
d3d5a2f
nit pick case and constants
gane5hvarma Nov 9, 2023
b4589e7
add terminal in metric label
gane5hvarma Nov 9, 2023
1bcc5fd
add migrated status back
gane5hvarma Nov 9, 2023
51e5266
Merge branch 'master' into feat.unifiedPipelineMetrics
gane5hvarma Nov 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions enterprise/reporting/event_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package reporting

import (
"strconv"

"github.com/rudderlabs/rudder-go-kit/stats"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
)

type EventStatsReporter struct {
stats stats.Stats
configSubscriber *configSubscriber
}

func NewEventStatsReporter(configSubscriber *configSubscriber, stats stats.Stats) *EventStatsReporter {
return &EventStatsReporter{
stats: stats,
configSubscriber: configSubscriber,
}
}

const EventsProcessedMetricName = "events_processed_total"

func (es *EventStatsReporter) Record(metrics []*types.PUReportedMetric) {
for index := range metrics {
tags := stats.Tags{
"workspaceId": es.configSubscriber.WorkspaceIDFromSource(metrics[index].ConnectionDetails.SourceID),
"sourceId": metrics[index].ConnectionDetails.SourceID,
"destinationId": metrics[index].ConnectionDetails.DestinationID,
"reportedBy": metrics[index].PUDetails.PU,
"sourceCategory": metrics[index].ConnectionDetails.SourceCategory,
"statusCode": strconv.Itoa(metrics[index].StatusDetail.StatusCode),
"terminal": strconv.FormatBool(metrics[index].PUDetails.TerminalPU),
"destinationType": es.configSubscriber.GetDestDetail(metrics[index].ConnectionDetails.DestinationID).destType,
"status": metrics[index].StatusDetail.Status,
}
es.stats.NewTaggedStat(EventsProcessedMetricName, stats.CountType, tags).Count(int(metrics[index].StatusDetail.Count))
}
}

func (es *EventStatsReporter) Report(metrics []*types.PUReportedMetric, tx *Tx) error {
tx.AddSuccessListener(func() {
es.Record(metrics)
})
return nil
}

func (es *EventStatsReporter) Stop() {
}

func (es *EventStatsReporter) DatabaseSyncer(c types.SyncerConfig) types.ReportingSyncer {
return func() {}
}
189 changes: 189 additions & 0 deletions enterprise/reporting/event_stats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package reporting

import (
"context"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/backend-config"
"github.com/rudderlabs/rudder-server/utils/pubsub"
"github.com/rudderlabs/rudder-server/utils/types"
)

func TestEventStatsReporter(t *testing.T) {
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved
workspaceID := "test-workspace-id"
sourceID := "test-source-id"
destinationID := "test-destination-id"
reportedBy := "test-reported-by"
sourceCategory := "test-source-category"
ctrl := gomock.NewController(t)
mockBackendConfig := mocksBackendConfig.NewMockBackendConfig(ctrl)
mockBackendConfig.EXPECT().Subscribe(gomock.Any(), backendconfig.TopicBackendConfig).DoAndReturn(func(ctx context.Context, topic backendconfig.Topic) pubsub.DataChannel {
ch := make(chan pubsub.DataEvent, 1)
ch <- pubsub.DataEvent{
Data: map[string]backendconfig.ConfigT{
workspaceID: {
WorkspaceID: workspaceID,
Sources: []backendconfig.SourceT{
{
ID: sourceID,
Enabled: true,
SourceDefinition: backendconfig.SourceDefinitionT{
Category: sourceCategory,
},
Destinations: []backendconfig.DestinationT{
{
ID: destinationID,
Enabled: true,
DestinationDefinition: backendconfig.DestinationDefinitionT{
Name: "test-destination-name",
},
},
},
},
},
},
},
Topic: string(backendconfig.TopicBackendConfig),
}
close(ch)
return ch
}).AnyTimes()

statsStore := memstats.New()
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cs := newConfigSubscriber(logger.NOP)

subscribeDone := make(chan struct{})
go func() {
defer close(subscribeDone)

cs.Subscribe(ctx, mockBackendConfig)
}()

testReports := []*types.PUReportedMetric{
{
ConnectionDetails: types.ConnectionDetails{
SourceID: sourceID,
DestinationID: destinationID,
SourceCategory: sourceCategory,
},
PUDetails: types.PUDetails{
PU: reportedBy,
TerminalPU: true,
},
StatusDetail: &types.StatusDetail{
Count: 10,
Status: jobsdb.Succeeded.State,
StatusCode: 200,
},
},
{
ConnectionDetails: types.ConnectionDetails{
SourceID: sourceID,
DestinationID: destinationID,
SourceCategory: sourceCategory,
},
PUDetails: types.PUDetails{
PU: reportedBy,
TerminalPU: true,
},
StatusDetail: &types.StatusDetail{
Count: 50,
Status: jobsdb.Aborted.State,
StatusCode: 500,
},
},
{
ConnectionDetails: types.ConnectionDetails{
SourceID: sourceID,
DestinationID: destinationID,
SourceCategory: sourceCategory,
},
PUDetails: types.PUDetails{
PU: reportedBy,
TerminalPU: true,
},
StatusDetail: &types.StatusDetail{
Count: 150,
Status: jobsdb.Migrated.State,
StatusCode: 500,
},
},
{
ConnectionDetails: types.ConnectionDetails{
SourceID: sourceID,
DestinationID: destinationID,
SourceCategory: sourceCategory,
},
PUDetails: types.PUDetails{
PU: reportedBy,
TerminalPU: false,
},
StatusDetail: &types.StatusDetail{
Count: 100,
Status: "non-terminal",
StatusCode: 500,
},
},
}
esr := NewEventStatsReporter(cs, statsStore)
esr.Record(testReports)
require.Equal(t, statsStore.Get(EventsProcessedMetricName, map[string]string{
"workspaceId": workspaceID,
"sourceId": sourceID,
"destinationId": destinationID,
"reportedBy": reportedBy,
"sourceCategory": sourceCategory,
"statusCode": "200",
"destinationType": "test-destination-name",
"terminal": "true",
"status": jobsdb.Succeeded.State,
}).LastValue(), float64(10))
require.Equal(t, statsStore.Get(EventsProcessedMetricName, map[string]string{
"workspaceId": workspaceID,
"sourceId": sourceID,
"destinationId": destinationID,
"reportedBy": reportedBy,
"sourceCategory": sourceCategory,
"statusCode": "500",
"destinationType": "test-destination-name",
"terminal": "true",
"status": jobsdb.Aborted.State,
}).LastValue(), float64(50))
require.Equal(t, statsStore.Get(EventsProcessedMetricName, map[string]string{
"workspaceId": workspaceID,
"sourceId": sourceID,
"destinationId": destinationID,
"reportedBy": reportedBy,
"sourceCategory": sourceCategory,
"statusCode": "500",
"destinationType": "test-destination-name",
"terminal": "true",
"status": jobsdb.Migrated.State,
}).LastValue(), float64(150))
require.Equal(t, statsStore.Get(EventsProcessedMetricName, map[string]string{
"workspaceId": workspaceID,
"sourceId": sourceID,
"destinationId": destinationID,
"reportedBy": reportedBy,
"sourceCategory": sourceCategory,
"statusCode": "500",
"destinationType": "test-destination-name",
"terminal": "false",
"status": "non-terminal",
}).LastValue(), float64(100))

t.Cleanup(func() {
cancel()
<-subscribeDone
})
}
gane5hvarma marked this conversation as resolved.
Show resolved Hide resolved
10 changes: 7 additions & 3 deletions enterprise/reporting/mediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package reporting
import (
"context"

"github.com/rudderlabs/rudder-go-kit/stats"

erridx "github.com/rudderlabs/rudder-server/enterprise/reporting/error_index"

"github.com/rudderlabs/rudder-go-kit/stats"

"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/config"
Expand All @@ -23,6 +23,7 @@ type Mediator struct {
ctx context.Context
cancel context.CancelFunc
reporters []types.Reporting
stats stats.Stats
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably skip setting in the struct since it's being used in the constructor only. wdyt?

}

func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToken string, backendConfig backendconfig.BackendConfig) *Mediator {
gane5hvarma marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -31,6 +32,7 @@ func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToke

rm := &Mediator{
log: log,
stats: stats.Default,
g: g,
ctx: ctx,
cancel: cancel,
Expand All @@ -48,7 +50,7 @@ func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToke
})

// default reporting implementation
defaultReporter := NewDefaultReporter(rm.ctx, rm.log, configSubscriber)
defaultReporter := NewDefaultReporter(rm.ctx, rm.log, configSubscriber, rm.stats)
rm.reporters = append(rm.reporters, defaultReporter)

// error reporting implementation
Expand All @@ -62,6 +64,8 @@ func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToke
errorIndexReporter := erridx.NewErrorIndexReporter(rm.ctx, rm.log, configSubscriber, config.Default, stats.Default)
rm.reporters = append(rm.reporters, errorIndexReporter)
}
eventStatsReporter := NewEventStatsReporter(configSubscriber, rm.stats)
rm.reporters = append(rm.reporters, eventStatsReporter)

return rm
}
Expand Down
26 changes: 14 additions & 12 deletions enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@
getMinReportedAtQueryTime stats.Measurement
getReportsQueryTime stats.Measurement
requestLatency stats.Measurement
stats stats.Stats
}

func NewDefaultReporter(ctx context.Context, log logger.Logger, configSubscriber *configSubscriber) *DefaultReporter {
func NewDefaultReporter(ctx context.Context, log logger.Logger, configSubscriber *configSubscriber, stats stats.Stats) *DefaultReporter {
var dbQueryTimeout *config.Reloadable[time.Duration]

reportingServiceURL := config.GetString("REPORTING_URL", "https://reporting.rudderstack.com/")
Expand Down Expand Up @@ -108,6 +109,7 @@
maxOpenConnections: maxOpenConnections,
maxConcurrentRequests: maxConcurrentRequests,
dbQueryTimeout: dbQueryTimeout,
stats: stats,
}
}

Expand Down Expand Up @@ -330,16 +332,16 @@
tr := &http.Transport{}
netClient := &http.Client{Transport: tr, Timeout: config.GetDuration("HttpClient.reporting.timeout", 60, time.Second)}
tags := r.getTags(c.Label)
mainLoopTimer := stats.Default.NewTaggedStat(StatReportingMainLoopTime, stats.TimerType, tags)
getReportsTimer := stats.Default.NewTaggedStat(StatReportingGetReportsTime, stats.TimerType, tags)
getReportsCount := stats.Default.NewTaggedStat(StatReportingGetReportsCount, stats.HistogramType, tags)
getAggregatedReportsTimer := stats.Default.NewTaggedStat(StatReportingGetAggregatedReportsTime, stats.TimerType, tags)
getAggregatedReportsCount := stats.Default.NewTaggedStat(StatReportingGetAggregatedReportsCount, stats.HistogramType, tags)

r.getMinReportedAtQueryTime = stats.Default.NewTaggedStat(StatReportingGetMinReportedAtQueryTime, stats.TimerType, tags)
r.getReportsQueryTime = stats.Default.NewTaggedStat(StatReportingGetReportsQueryTime, stats.TimerType, tags)
r.requestLatency = stats.Default.NewTaggedStat(StatReportingHttpReqLatency, stats.TimerType, tags)
reportingLag := stats.Default.NewTaggedStat(
mainLoopTimer := r.stats.NewTaggedStat(StatReportingMainLoopTime, stats.TimerType, tags)
getReportsTimer := r.stats.NewTaggedStat(StatReportingGetReportsTime, stats.TimerType, tags)
getReportsCount := r.stats.NewTaggedStat(StatReportingGetReportsCount, stats.HistogramType, tags)
getAggregatedReportsTimer := r.stats.NewTaggedStat(StatReportingGetAggregatedReportsTime, stats.TimerType, tags)
getAggregatedReportsCount := r.stats.NewTaggedStat(StatReportingGetAggregatedReportsCount, stats.HistogramType, tags)

r.getMinReportedAtQueryTime = r.stats.NewTaggedStat(StatReportingGetMinReportedAtQueryTime, stats.TimerType, tags)
r.getReportsQueryTime = r.stats.NewTaggedStat(StatReportingGetReportsQueryTime, stats.TimerType, tags)
r.requestLatency = r.stats.NewTaggedStat(StatReportingHttpReqLatency, stats.TimerType, tags)
reportingLag := r.stats.NewTaggedStat(

Check warning on line 344 in enterprise/reporting/reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/reporting.go#L335-L344

Added lines #L335 - L344 were not covered by tests
"reporting_metrics_lag_seconds", stats.GaugeType, stats.Tags{"client": c.Label},
)

Expand Down Expand Up @@ -465,7 +467,7 @@
r.requestLatency.Since(httpRequestStart)
httpStatTags := r.getTags(label)
httpStatTags["status"] = strconv.Itoa(resp.StatusCode)
stats.Default.NewTaggedStat(StatReportingHttpReq, stats.CountType, httpStatTags).Count(1)
r.stats.NewTaggedStat(StatReportingHttpReq, stats.CountType, httpStatTags).Count(1)

Check warning on line 470 in enterprise/reporting/reporting.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/reporting.go#L470

Added line #L470 was not covered by tests

defer func() { httputil.CloseResponse(resp) }()
respBody, err := io.ReadAll(resp.Body)
Expand Down
3 changes: 2 additions & 1 deletion enterprise/reporting/reporting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
"github.com/rudderlabs/rudder-server/utils/types"
)

Expand Down Expand Up @@ -198,7 +199,7 @@ var _ = Describe("Reporting", func() {
},
}
configSubscriber := newConfigSubscriber(logger.NOP)
reportHandle := NewDefaultReporter(context.Background(), logger.NOP, configSubscriber)
reportHandle := NewDefaultReporter(context.Background(), logger.NOP, configSubscriber, memstats.New())

aggregatedMetrics := reportHandle.getAggregatedReports(inputReports)
Expect(aggregatedMetrics).To(Equal(expectedResponse))
Expand Down
6 changes: 3 additions & 3 deletions enterprise/reporting/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,21 @@ func TestSetupForDelegates(t *testing.T) {
errorReportingEnabled: false,
enterpriseTokenExists: true,
errorIndexReportingEnabled: false,
expectedDelegates: 1,
expectedDelegates: 2,
},
{
reportingEnabled: true,
errorReportingEnabled: true,
enterpriseTokenExists: true,
errorIndexReportingEnabled: false,
expectedDelegates: 2,
expectedDelegates: 3,
},
{
reportingEnabled: true,
errorReportingEnabled: true,
enterpriseTokenExists: true,
errorIndexReportingEnabled: true,
expectedDelegates: 3,
expectedDelegates: 4,
},
{
reportingEnabled: true,
Expand Down
Loading