Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: track event delivery stats #3974

Merged
merged 38 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
7fc3394
feat: track event delivery stats
gane5hvarma Oct 12, 2023
a9bc0b0
change package name
gane5hvarma Oct 12, 2023
98e2a1f
resolve merge conflicts
gane5hvarma Oct 13, 2023
a30f696
add basic test to check metrics
gane5hvarma Oct 14, 2023
1708089
rm fmt
gane5hvarma Oct 14, 2023
a64b907
make fmt
gane5hvarma Oct 14, 2023
58439a5
make fmt
gane5hvarma Oct 14, 2023
a99bc70
fix setup test
gane5hvarma Oct 14, 2023
14c097a
Merge branch 'master' into feat.unifiedPipelineMetrics
gane5hvarma Oct 17, 2023
1118a95
Merge branch 'master' into feat.unifiedPipelineMetrics
gane5hvarma Oct 18, 2023
ba2ba50
Merge branch 'master' into feat.unifiedPipelineMetrics
fracasula Oct 18, 2023
14932f9
Merge branch 'master' into feat.unifiedPipelineMetrics
gane5hvarma Oct 18, 2023
a4053c9
Merge branch 'master' into feat.unifiedPipelineMetrics
gane5hvarma Oct 18, 2023
4e2fde9
Merge branch 'master' into feat.unifiedPipelineMetrics
gane5hvarma Oct 19, 2023
b0f9a4b
add tx listener to caputre event stats
gane5hvarma Oct 30, 2023
91a9275
Merge branch 'feat.unifiedPipelineMetrics' of github.com:rudderlabs/r…
gane5hvarma Oct 30, 2023
2c633fe
remove unused imports
gane5hvarma Oct 30, 2023
f785db0
fmt file
gane5hvarma Oct 31, 2023
1c7ecbc
fix tests in reporting
gane5hvarma Oct 31, 2023
d0ba83b
Merge branch 'master' into feat.unifiedPipelineMetrics
gane5hvarma Oct 31, 2023
736e784
Merge branch 'master' into feat.unifiedPipelineMetrics
gane5hvarma Nov 1, 2023
8dcef04
resolve pr comments
gane5hvarma Nov 1, 2023
259adc8
Merge branch 'feat.unifiedPipelineMetrics' of github.com:rudderlabs/r…
gane5hvarma Nov 1, 2023
5bde9ad
resolve pr comments
gane5hvarma Nov 8, 2023
7129cfb
Merge branch 'master' into feat.unifiedPipelineMetrics
gane5hvarma Nov 8, 2023
68232c4
fix count in aborted test
gane5hvarma Nov 8, 2023
04e0934
Merge branch 'feat.unifiedPipelineMetrics' of github.com:rudderlabs/r…
gane5hvarma Nov 8, 2023
582628f
fix database syncer return
gane5hvarma Nov 8, 2023
08bb132
fix delegate setup test
gane5hvarma Nov 8, 2023
4564996
add status as label and capture only one metric
gane5hvarma Nov 8, 2023
1a4a5bb
Merge branch 'master' into feat.unifiedPipelineMetrics
gane5hvarma Nov 8, 2023
b9c99a5
remove terminal in labels and only capture terminal status except mig…
gane5hvarma Nov 9, 2023
fbd3b9d
Merge branch 'feat.unifiedPipelineMetrics' of github.com:rudderlabs/r…
gane5hvarma Nov 9, 2023
0619473
fix test in event stats
gane5hvarma Nov 9, 2023
d3d5a2f
nit pick case and constants
gane5hvarma Nov 9, 2023
b4589e7
add terminal in metric label
gane5hvarma Nov 9, 2023
1bcc5fd
add migrated status back
gane5hvarma Nov 9, 2023
51e5266
Merge branch 'master' into feat.unifiedPipelineMetrics
gane5hvarma Nov 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions enterprise/reporting/event_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package reporting

import (
"database/sql"
"fmt"
"strconv"

"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/utils/types"
)

const (
EventsDeliveredMetricName = "events_delivered_total"
EventsAbortedMetricName = "events_aborted_total"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion, would it be better if we use a single metric instead?

Suggested change
EventsDeliveredMetricName = "events_delivered_total"
EventsAbortedMetricName = "events_aborted_total"
EventsProcessedMetricName = "events_processed_total"

And have status as label/tag.

It would make it more convenient in the future if we want to capture other categories, plus the summary (delivered+aborted) is a useful number and can be used on it own or to create percentages

)
atzoum marked this conversation as resolved.
Show resolved Hide resolved

type EventStatsReporter struct {
stats stats.Stats
configSubscriber *configSubscriber
}

func NewEventStatsReporter(configSubscriber *configSubscriber, stats stats.Stats) *EventStatsReporter {
return &EventStatsReporter{
stats: stats,
configSubscriber: configSubscriber,
}
}

func (es *EventStatsReporter) Report(metrics []*types.PUReportedMetric, _ *sql.Tx) error {
atzoum marked this conversation as resolved.
Show resolved Hide resolved
// tracking delivery event stats which has state - succeeded, aborted
for _, metric := range metrics {
metric := metric
gane5hvarma marked this conversation as resolved.
Show resolved Hide resolved
if !(metric.StatusDetail.Status == "aborted" || metric.StatusDetail.Status == "succeeded") {
gane5hvarma marked this conversation as resolved.
Show resolved Hide resolved
continue
}
tags := stats.Tags{
"workspaceId": es.configSubscriber.WorkspaceIDFromSource(metric.ConnectionDetails.SourceID),
"sourceId": metric.ConnectionDetails.SourceID,
"destinationId": metric.ConnectionDetails.DestinationID,
"reportedBy": metric.PUDetails.PU,
"sourceCategory": metric.ConnectionDetails.SourceCategory,
"terminal": strconv.FormatBool(metric.PUDetails.TerminalPU),
}
fmt.Println(tags)
metricName := EventsDeliveredMetricName
if metric.StatusDetail.Status == "aborted" {
metricName = EventsAbortedMetricName
}
es.stats.NewTaggedStat(metricName, stats.CountType, tags).Count(int(metric.StatusDetail.Count))
}
return nil
}

func (es *EventStatsReporter) DatabaseSyncer(types.SyncerConfig) types.ReportingSyncer {
return func() {}
}
142 changes: 142 additions & 0 deletions enterprise/reporting/event_stats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package reporting

import (
"context"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/backend-config"
"github.com/rudderlabs/rudder-server/utils/pubsub"
"github.com/rudderlabs/rudder-server/utils/types"
)

func TestEventStatsReporter(t *testing.T) {
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved
workspaceID := "test-workspace-id"
sourceID := "test-source-id"
destinationID := "test-destination-id"
reportedBy := "test-reported-by"
sourceCategory := "test-source-category"
ctrl := gomock.NewController(t)
mockBackendConfig := mocksBackendConfig.NewMockBackendConfig(ctrl)
mockBackendConfig.EXPECT().Subscribe(gomock.Any(), backendconfig.TopicBackendConfig).DoAndReturn(func(ctx context.Context, topic backendconfig.Topic) pubsub.DataChannel {
ch := make(chan pubsub.DataEvent, 1)
ch <- pubsub.DataEvent{
Data: map[string]backendconfig.ConfigT{
workspaceID: {
WorkspaceID: workspaceID,
Sources: []backendconfig.SourceT{
{
ID: sourceID,
Enabled: true,
SourceDefinition: backendconfig.SourceDefinitionT{
Category: sourceCategory,
},
Destinations: []backendconfig.DestinationT{
{
ID: destinationID,
Enabled: true,
},
},
},
},
},
},
Topic: string(backendconfig.TopicBackendConfig),
}
close(ch)
return ch
}).AnyTimes()

statsStore := memstats.New()
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cs := newConfigSubscriber(logger.NOP)

subscribeDone := make(chan struct{})
go func() {
defer close(subscribeDone)

cs.Subscribe(ctx, mockBackendConfig)
}()

testReports := []*types.PUReportedMetric{
{
ConnectionDetails: types.ConnectionDetails{
SourceID: sourceID,
DestinationID: destinationID,
SourceCategory: sourceCategory,
},
PUDetails: types.PUDetails{
PU: reportedBy,
TerminalPU: true,
},
StatusDetail: &types.StatusDetail{
Count: 10,
Status: "succeeded",
},
},
{
ConnectionDetails: types.ConnectionDetails{
SourceID: sourceID,
DestinationID: destinationID,
SourceCategory: sourceCategory,
},
PUDetails: types.PUDetails{
PU: reportedBy,
TerminalPU: false,
},
StatusDetail: &types.StatusDetail{
Count: 10,
Status: "aborted",
},
},
// {
// ConnectionDetails: types.ConnectionDetails{
// SourceID: "momentum",
// DestinationID: destinationID,
// SourceCategory: sourceCategory,
// },
// PUDetails: types.PUDetails{
// PU: reportedBy,
// TerminalPU: false,
// },
// StatusDetail: &types.StatusDetail{
// Count: 10,
// Status: "momentum",
// },
// },
}
esr := NewEventStatsReporter(cs, statsStore)
err := esr.Report(testReports, nil)
require.NoError(t, err)
require.Equal(t, statsStore.Get(EventsDeliveredMetricName, map[string]string{
"workspaceId": workspaceID,
"sourceId": sourceID,
"destinationId": destinationID,
"reportedBy": reportedBy,
"sourceCategory": sourceCategory,
"terminal": "true",
}).LastValue(), float64(10))
require.Equal(t, statsStore.Get(EventsAbortedMetricName, map[string]string{
"workspaceId": workspaceID,
"sourceId": sourceID,
"destinationId": destinationID,
"reportedBy": reportedBy,
"sourceCategory": sourceCategory,
"terminal": "false",
}).LastValue(), float64(10))
// require.Equal(t, statsStore.Get(EventsAbortedMetricName, map[string]string{
// "workspaceId": workspaceID,
// "sourceId": "momentum",
// "destinationId": destinationID,
// "reportedBy": reportedBy,
// "sourceCategory": sourceCategory,
// "terminal": "false",
// }), nil)
}
gane5hvarma marked this conversation as resolved.
Show resolved Hide resolved
7 changes: 6 additions & 1 deletion enterprise/reporting/mediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"database/sql"

"github.com/rudderlabs/rudder-go-kit/stats"

"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/config"
Expand All @@ -18,11 +20,13 @@ type Mediator struct {
g *errgroup.Group
ctx context.Context
reporters []types.Reporting
stats stats.Stats
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably skip setting in the struct since it's being used in the constructor only. wdyt?

}

func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToken string, backendConfig backendconfig.BackendConfig) *Mediator {
gane5hvarma marked this conversation as resolved.
Show resolved Hide resolved
rm := &Mediator{
log: log,
log: log,
stats: stats.Default,
}
rm.g, rm.ctx = errgroup.WithContext(ctx)

Expand Down Expand Up @@ -52,6 +56,7 @@ func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToke
errorIndexReporter := NewErrorIndexReporter(rm.ctx, config.Default, rm.log, configSubscriber)
rm.reporters = append(rm.reporters, errorIndexReporter)
}
rm.reporters = append(rm.reporters, NewEventStatsReporter(configSubscriber, rm.stats))

return rm
}
Expand Down
6 changes: 3 additions & 3 deletions enterprise/reporting/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,21 @@ func TestSetupForDelegates(t *testing.T) {
errorReportingEnabled: false,
enterpriseTokenExists: true,
errorIndexReportingEnabled: false,
expectedDelegates: 1,
expectedDelegates: 2,
},
{
reportingEnabled: true,
errorReportingEnabled: true,
enterpriseTokenExists: true,
errorIndexReportingEnabled: false,
expectedDelegates: 2,
expectedDelegates: 3,
},
{
reportingEnabled: true,
errorReportingEnabled: true,
enterpriseTokenExists: true,
errorIndexReportingEnabled: true,
expectedDelegates: 3,
expectedDelegates: 4,
},
{
reportingEnabled: true,
Expand Down
Loading