From 7fc339447d33591b2671688d256ca622ce04199c Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Thu, 12 Oct 2023 16:42:11 -0400 Subject: [PATCH 01/21] feat: track event delivery stats --- enterprise/reporting/event-stats/stats.go | 35 +++++++++++++++++++++++ enterprise/reporting/reporting.go | 3 ++ 2 files changed, 38 insertions(+) create mode 100644 enterprise/reporting/event-stats/stats.go diff --git a/enterprise/reporting/event-stats/stats.go b/enterprise/reporting/event-stats/stats.go new file mode 100644 index 0000000000..101db9ed8b --- /dev/null +++ b/enterprise/reporting/event-stats/stats.go @@ -0,0 +1,35 @@ +package reporting + +import ( + "strconv" + + "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-server/utils/types" +) + +const ( + EventsDeliveredMetricName = "events_delivered_total" + EventsAbortedMetricName = "events_aborted_total" +) + +func Report(workspaceId string, metric *types.PUReportedMetric) { + // tracking delivery event stats which has state - succeeded, aborted + if !(metric.StatusDetail.Status == "aborted" || metric.StatusDetail.Status == "succeeded") { + return + } + + tags := map[string]string{ + "workspaceId": workspaceId, + "sourceId": metric.ConnectionDetails.SourceID, + "destinationId": metric.ConnectionDetails.DestinationID, + "reportedBy": metric.PUDetails.PU, + "sourceCategory": metric.ConnectionDetails.SourceCategory, + "terminal": strconv.FormatBool(metric.PUDetails.TerminalPU), + } + + metricName := EventsDeliveredMetricName + if metric.StatusDetail.Status == "aborted" { + metricName = EventsAbortedMetricName + } + stats.Default.NewTaggedStat(metricName, stats.CountType, tags).Count(int(metric.StatusDetail.Count)) +} diff --git a/enterprise/reporting/reporting.go b/enterprise/reporting/reporting.go index 2284ba2a0b..1165787367 100644 --- a/enterprise/reporting/reporting.go +++ b/enterprise/reporting/reporting.go @@ -24,6 +24,7 @@ import ( "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" backendconfig "github.com/rudderlabs/rudder-server/backend-config" + eventStats "github.com/rudderlabs/rudder-server/enterprise/reporting/event-stats" migrator "github.com/rudderlabs/rudder-server/services/sql-migrator" "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/misc" @@ -643,6 +644,8 @@ func (r *DefaultReporter) Report(metrics []*types.PUReportedMetric, txn *sql.Tx) if err != nil { panic(err) } + + eventStats.Report(workspaceID, &metric) } _, err = stmt.Exec() From a9bc0b05a1ab3ee8c77a24081bbc9dae3197b866 Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Thu, 12 Oct 2023 18:14:39 -0400 Subject: [PATCH 02/21] change package name --- enterprise/reporting/event-stats/stats.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/enterprise/reporting/event-stats/stats.go b/enterprise/reporting/event-stats/stats.go index 101db9ed8b..63b50abaed 100644 --- a/enterprise/reporting/event-stats/stats.go +++ b/enterprise/reporting/event-stats/stats.go @@ -1,4 +1,4 @@ -package reporting +package event_stats import ( "strconv" From a30f696ff37337ee2757d4cbbffb2e8cde249654 Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Fri, 13 Oct 2023 21:41:02 -0400 Subject: [PATCH 03/21] add basic test to check metrics --- enterprise/reporting/event-stats/stats.go | 35 ------ enterprise/reporting/event_stats.go | 60 +++++++++ enterprise/reporting/event_stats_test.go | 142 ++++++++++++++++++++++ enterprise/reporting/mediator.go | 7 +- enterprise/reporting/reporting.go | 3 - 5 files changed, 208 insertions(+), 39 deletions(-) delete mode 100644 enterprise/reporting/event-stats/stats.go create mode 100644 enterprise/reporting/event_stats.go create mode 100644 enterprise/reporting/event_stats_test.go diff --git a/enterprise/reporting/event-stats/stats.go b/enterprise/reporting/event-stats/stats.go deleted file mode 100644 index 63b50abaed..0000000000 --- a/enterprise/reporting/event-stats/stats.go +++ /dev/null @@ -1,35 +0,0 @@ -package event_stats - -import ( - "strconv" - - "github.com/rudderlabs/rudder-go-kit/stats" - "github.com/rudderlabs/rudder-server/utils/types" -) - -const ( - EventsDeliveredMetricName = "events_delivered_total" - EventsAbortedMetricName = "events_aborted_total" -) - -func Report(workspaceId string, metric *types.PUReportedMetric) { - // tracking delivery event stats which has state - succeeded, aborted - if !(metric.StatusDetail.Status == "aborted" || metric.StatusDetail.Status == "succeeded") { - return - } - - tags := map[string]string{ - "workspaceId": workspaceId, - "sourceId": metric.ConnectionDetails.SourceID, - "destinationId": metric.ConnectionDetails.DestinationID, - "reportedBy": metric.PUDetails.PU, - "sourceCategory": metric.ConnectionDetails.SourceCategory, - "terminal": strconv.FormatBool(metric.PUDetails.TerminalPU), - } - - metricName := EventsDeliveredMetricName - if metric.StatusDetail.Status == "aborted" { - metricName = EventsAbortedMetricName - } - stats.Default.NewTaggedStat(metricName, stats.CountType, tags).Count(int(metric.StatusDetail.Count)) -} diff --git a/enterprise/reporting/event_stats.go b/enterprise/reporting/event_stats.go new file mode 100644 index 0000000000..966cb109c4 --- /dev/null +++ b/enterprise/reporting/event_stats.go @@ -0,0 +1,60 @@ +package reporting + +import ( + "database/sql" + "fmt" + "strconv" + + "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-server/utils/types" +) + +const ( + EventsDeliveredMetricName = "events_delivered_total" + EventsAbortedMetricName = "events_aborted_total" +) + +type EventStatsReporter struct { + stats stats.Stats + configSubscriber *configSubscriber +} + +func NewEventStatsReporter(configSubscriber *configSubscriber, stats stats.Stats) *EventStatsReporter { + return &EventStatsReporter{ + stats: stats, + configSubscriber: configSubscriber, + } +} + +func (es *EventStatsReporter) Report(metrics []*types.PUReportedMetric, _ *sql.Tx) error { + // tracking delivery event stats which has state - succeeded, aborted + for _, metric := range metrics { + metric := metric + if !(metric.StatusDetail.Status == "aborted" || metric.StatusDetail.Status == "succeeded") { + continue + } + fmt.Println(es.configSubscriber.WorkspaceIDFromSource(metric.ConnectionDetails.SourceID)) + tags := stats.Tags{ + "workspaceId": es.configSubscriber.WorkspaceIDFromSource(metric.ConnectionDetails.SourceID), + "sourceId": metric.ConnectionDetails.SourceID, + "destinationId": metric.ConnectionDetails.DestinationID, + "reportedBy": metric.PUDetails.PU, + "sourceCategory": metric.ConnectionDetails.SourceCategory, + "terminal": strconv.FormatBool(metric.PUDetails.TerminalPU), + } + fmt.Println(tags, "tags") + + metricName := EventsDeliveredMetricName + if metric.StatusDetail.Status == "aborted" { + metricName = EventsAbortedMetricName + } + fmt.Println(metricName, "metricName") + fmt.Println(es.stats) + es.stats.NewTaggedStat(metricName, stats.CountType, tags).Count(int(metric.StatusDetail.Count)) + } + return nil +} + +func (es *EventStatsReporter) DatabaseSyncer(types.SyncerConfig) types.ReportingSyncer { + return func() {} +} diff --git a/enterprise/reporting/event_stats_test.go b/enterprise/reporting/event_stats_test.go new file mode 100644 index 0000000000..c6c693012a --- /dev/null +++ b/enterprise/reporting/event_stats_test.go @@ -0,0 +1,142 @@ +package reporting + +import ( + "context" + "testing" + + "github.com/golang/mock/gomock" + "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats/memstats" + backendconfig "github.com/rudderlabs/rudder-server/backend-config" + mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/backend-config" + "github.com/rudderlabs/rudder-server/utils/pubsub" + "github.com/rudderlabs/rudder-server/utils/types" + "github.com/stretchr/testify/require" +) + +func TestEventStatsReporter(t *testing.T) { + workspaceID := "test-workspace-id" + sourceID := "test-source-id" + destinationID := "test-destination-id" + reportedBy := "test-reported-by" + sourceCategory := "test-source-category" + ctrl := gomock.NewController(t) + mockBackendConfig := mocksBackendConfig.NewMockBackendConfig(ctrl) + mockBackendConfig.EXPECT().Subscribe(gomock.Any(), backendconfig.TopicBackendConfig).DoAndReturn(func(ctx context.Context, topic backendconfig.Topic) pubsub.DataChannel { + ch := make(chan pubsub.DataEvent, 1) + ch <- pubsub.DataEvent{ + Data: map[string]backendconfig.ConfigT{ + workspaceID: { + WorkspaceID: workspaceID, + Sources: []backendconfig.SourceT{ + { + ID: sourceID, + Enabled: true, + SourceDefinition: backendconfig.SourceDefinitionT{ + Category: sourceCategory, + }, + Destinations: []backendconfig.DestinationT{ + { + ID: destinationID, + Enabled: true, + }, + }, + }, + }, + }, + }, + Topic: string(backendconfig.TopicBackendConfig), + } + close(ch) + return ch + }).AnyTimes() + + statsStore := memstats.New() + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + cs := newConfigSubscriber(logger.NOP) + + subscribeDone := make(chan struct{}) + go func() { + defer close(subscribeDone) + + cs.Subscribe(ctx, mockBackendConfig) + }() + + testReports := []*types.PUReportedMetric{ + { + ConnectionDetails: types.ConnectionDetails{ + SourceID: sourceID, + DestinationID: destinationID, + SourceCategory: sourceCategory, + }, + PUDetails: types.PUDetails{ + PU: reportedBy, + TerminalPU: true, + }, + StatusDetail: &types.StatusDetail{ + Count: 10, + Status: "succeeded", + }, + }, + { + ConnectionDetails: types.ConnectionDetails{ + SourceID: sourceID, + DestinationID: destinationID, + SourceCategory: sourceCategory, + }, + PUDetails: types.PUDetails{ + PU: reportedBy, + TerminalPU: false, + }, + StatusDetail: &types.StatusDetail{ + Count: 10, + Status: "aborted", + }, + }, + { + ConnectionDetails: types.ConnectionDetails{ + SourceID: "momentum", + DestinationID: destinationID, + SourceCategory: sourceCategory, + }, + PUDetails: types.PUDetails{ + PU: reportedBy, + TerminalPU: false, + }, + StatusDetail: &types.StatusDetail{ + Count: 10, + Status: "momentum", + }, + }, + } + esr := NewEventStatsReporter(cs, statsStore) + err := esr.Report(testReports, nil) + require.NoError(t, err) + require.Equal(t, statsStore.Get(EventsDeliveredMetricName, map[string]string{ + "workspaceId": workspaceID, + "sourceId": sourceID, + "destinationId": destinationID, + "reportedBy": reportedBy, + "sourceCategory": sourceCategory, + "terminal": "true", + }).LastValue(), float64(10)) + require.Equal(t, statsStore.Get(EventsAbortedMetricName, map[string]string{ + "workspaceId": workspaceID, + "sourceId": sourceID, + "destinationId": destinationID, + "reportedBy": reportedBy, + "sourceCategory": sourceCategory, + "terminal": "false", + }).LastValue(), float64(10)) + require.Equal(t, statsStore.Get(EventsAbortedMetricName, map[string]string{ + "workspaceId": workspaceID, + "sourceId": "momentum", + "destinationId": destinationID, + "reportedBy": reportedBy, + "sourceCategory": sourceCategory, + "terminal": "false", + }), nil) + +} diff --git a/enterprise/reporting/mediator.go b/enterprise/reporting/mediator.go index 3a4b96cd69..5a7980ff60 100644 --- a/enterprise/reporting/mediator.go +++ b/enterprise/reporting/mediator.go @@ -4,6 +4,8 @@ import ( "context" "database/sql" + "github.com/rudderlabs/rudder-go-kit/stats" + "golang.org/x/sync/errgroup" "github.com/rudderlabs/rudder-go-kit/config" @@ -18,11 +20,13 @@ type Mediator struct { g *errgroup.Group ctx context.Context reporters []types.Reporting + stats stats.Stats } func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToken string, backendConfig backendconfig.BackendConfig) *Mediator { rm := &Mediator{ - log: log, + log: log, + stats: stats.Default, } rm.g, rm.ctx = errgroup.WithContext(ctx) @@ -52,6 +56,7 @@ func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToke errorIndexReporter := NewErrorIndexReporter(rm.ctx, config.Default, rm.log, configSubscriber) rm.reporters = append(rm.reporters, errorIndexReporter) } + rm.reporters = append(rm.reporters, NewEventStatsReporter(configSubscriber, rm.stats)) return rm } diff --git a/enterprise/reporting/reporting.go b/enterprise/reporting/reporting.go index 11ff5a3793..a2063c168c 100644 --- a/enterprise/reporting/reporting.go +++ b/enterprise/reporting/reporting.go @@ -23,7 +23,6 @@ import ( "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" - eventStats "github.com/rudderlabs/rudder-server/enterprise/reporting/event-stats" migrator "github.com/rudderlabs/rudder-server/services/sql-migrator" "github.com/rudderlabs/rudder-server/utils/httputil" "github.com/rudderlabs/rudder-server/utils/misc" @@ -585,8 +584,6 @@ func (r *DefaultReporter) Report(metrics []*types.PUReportedMetric, txn *sql.Tx) if err != nil { return fmt.Errorf("executing statement: %v", err) } - - eventStats.Report(workspaceID, &metric) } if _, err = stmt.Exec(); err != nil { return fmt.Errorf("executing final statement: %v", err) From 17080897d0d7df21d56ddc8b70d39410f1d9516d Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Fri, 13 Oct 2023 21:43:14 -0400 Subject: [PATCH 04/21] rm fmt --- enterprise/reporting/event_stats.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/enterprise/reporting/event_stats.go b/enterprise/reporting/event_stats.go index 966cb109c4..2aa420c7b5 100644 --- a/enterprise/reporting/event_stats.go +++ b/enterprise/reporting/event_stats.go @@ -2,7 +2,6 @@ package reporting import ( "database/sql" - "fmt" "strconv" "github.com/rudderlabs/rudder-go-kit/stats" @@ -33,7 +32,6 @@ func (es *EventStatsReporter) Report(metrics []*types.PUReportedMetric, _ *sql.T if !(metric.StatusDetail.Status == "aborted" || metric.StatusDetail.Status == "succeeded") { continue } - fmt.Println(es.configSubscriber.WorkspaceIDFromSource(metric.ConnectionDetails.SourceID)) tags := stats.Tags{ "workspaceId": es.configSubscriber.WorkspaceIDFromSource(metric.ConnectionDetails.SourceID), "sourceId": metric.ConnectionDetails.SourceID, @@ -42,14 +40,11 @@ func (es *EventStatsReporter) Report(metrics []*types.PUReportedMetric, _ *sql.T "sourceCategory": metric.ConnectionDetails.SourceCategory, "terminal": strconv.FormatBool(metric.PUDetails.TerminalPU), } - fmt.Println(tags, "tags") metricName := EventsDeliveredMetricName if metric.StatusDetail.Status == "aborted" { metricName = EventsAbortedMetricName } - fmt.Println(metricName, "metricName") - fmt.Println(es.stats) es.stats.NewTaggedStat(metricName, stats.CountType, tags).Count(int(metric.StatusDetail.Count)) } return nil From a64b907fc25e531d4339f4d20fc7aaa9d1af3c59 Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Fri, 13 Oct 2023 21:49:57 -0400 Subject: [PATCH 05/21] make fmt --- enterprise/reporting/event_stats_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/enterprise/reporting/event_stats_test.go b/enterprise/reporting/event_stats_test.go index c6c693012a..c9477a4180 100644 --- a/enterprise/reporting/event_stats_test.go +++ b/enterprise/reporting/event_stats_test.go @@ -5,13 +5,14 @@ import ( "testing" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats/memstats" backendconfig "github.com/rudderlabs/rudder-server/backend-config" mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/backend-config" "github.com/rudderlabs/rudder-server/utils/pubsub" "github.com/rudderlabs/rudder-server/utils/types" - "github.com/stretchr/testify/require" ) func TestEventStatsReporter(t *testing.T) { @@ -138,5 +139,4 @@ func TestEventStatsReporter(t *testing.T) { "sourceCategory": sourceCategory, "terminal": "false", }), nil) - } From 58439a537fa7b366f8b45f111734fc7ee786f932 Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Fri, 13 Oct 2023 22:19:01 -0400 Subject: [PATCH 06/21] make fmt --- enterprise/reporting/event_stats.go | 3 +- enterprise/reporting/event_stats_test.go | 46 ++++++++++++------------ 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/enterprise/reporting/event_stats.go b/enterprise/reporting/event_stats.go index 2aa420c7b5..2ec8347056 100644 --- a/enterprise/reporting/event_stats.go +++ b/enterprise/reporting/event_stats.go @@ -2,6 +2,7 @@ package reporting import ( "database/sql" + "fmt" "strconv" "github.com/rudderlabs/rudder-go-kit/stats" @@ -40,7 +41,7 @@ func (es *EventStatsReporter) Report(metrics []*types.PUReportedMetric, _ *sql.T "sourceCategory": metric.ConnectionDetails.SourceCategory, "terminal": strconv.FormatBool(metric.PUDetails.TerminalPU), } - + fmt.Println(tags) metricName := EventsDeliveredMetricName if metric.StatusDetail.Status == "aborted" { metricName = EventsAbortedMetricName diff --git a/enterprise/reporting/event_stats_test.go b/enterprise/reporting/event_stats_test.go index c9477a4180..7bceccbcd4 100644 --- a/enterprise/reporting/event_stats_test.go +++ b/enterprise/reporting/event_stats_test.go @@ -96,21 +96,21 @@ func TestEventStatsReporter(t *testing.T) { Status: "aborted", }, }, - { - ConnectionDetails: types.ConnectionDetails{ - SourceID: "momentum", - DestinationID: destinationID, - SourceCategory: sourceCategory, - }, - PUDetails: types.PUDetails{ - PU: reportedBy, - TerminalPU: false, - }, - StatusDetail: &types.StatusDetail{ - Count: 10, - Status: "momentum", - }, - }, + // { + // ConnectionDetails: types.ConnectionDetails{ + // SourceID: "momentum", + // DestinationID: destinationID, + // SourceCategory: sourceCategory, + // }, + // PUDetails: types.PUDetails{ + // PU: reportedBy, + // TerminalPU: false, + // }, + // StatusDetail: &types.StatusDetail{ + // Count: 10, + // Status: "momentum", + // }, + // }, } esr := NewEventStatsReporter(cs, statsStore) err := esr.Report(testReports, nil) @@ -131,12 +131,12 @@ func TestEventStatsReporter(t *testing.T) { "sourceCategory": sourceCategory, "terminal": "false", }).LastValue(), float64(10)) - require.Equal(t, statsStore.Get(EventsAbortedMetricName, map[string]string{ - "workspaceId": workspaceID, - "sourceId": "momentum", - "destinationId": destinationID, - "reportedBy": reportedBy, - "sourceCategory": sourceCategory, - "terminal": "false", - }), nil) + // require.Equal(t, statsStore.Get(EventsAbortedMetricName, map[string]string{ + // "workspaceId": workspaceID, + // "sourceId": "momentum", + // "destinationId": destinationID, + // "reportedBy": reportedBy, + // "sourceCategory": sourceCategory, + // "terminal": "false", + // }), nil) } From a99bc70d8f7eda142e1dc423868aca98fbc3bc47 Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Sat, 14 Oct 2023 09:53:20 -0400 Subject: [PATCH 07/21] fix setup test --- enterprise/reporting/setup_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/enterprise/reporting/setup_test.go b/enterprise/reporting/setup_test.go index 1c23116370..c0e092ef0b 100644 --- a/enterprise/reporting/setup_test.go +++ b/enterprise/reporting/setup_test.go @@ -78,21 +78,21 @@ func TestSetupForDelegates(t *testing.T) { errorReportingEnabled: false, enterpriseTokenExists: true, errorIndexReportingEnabled: false, - expectedDelegates: 1, + expectedDelegates: 2, }, { reportingEnabled: true, errorReportingEnabled: true, enterpriseTokenExists: true, errorIndexReportingEnabled: false, - expectedDelegates: 2, + expectedDelegates: 3, }, { reportingEnabled: true, errorReportingEnabled: true, enterpriseTokenExists: true, errorIndexReportingEnabled: true, - expectedDelegates: 3, + expectedDelegates: 4, }, { reportingEnabled: true, From 2c633fe30b758405cd3b95d02d9329a5e8c1605c Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Mon, 30 Oct 2023 19:20:17 -0400 Subject: [PATCH 08/21] remove unused imports --- enterprise/reporting/event_stats.go | 1 - enterprise/reporting/mediator.go | 6 ++---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/enterprise/reporting/event_stats.go b/enterprise/reporting/event_stats.go index 17e1f97192..75dd2315b8 100644 --- a/enterprise/reporting/event_stats.go +++ b/enterprise/reporting/event_stats.go @@ -5,7 +5,6 @@ import ( "strconv" "github.com/rudderlabs/rudder-go-kit/stats" - . "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck "github.com/rudderlabs/rudder-server/utils/types" ) diff --git a/enterprise/reporting/mediator.go b/enterprise/reporting/mediator.go index cb2b3106dd..6de214c111 100644 --- a/enterprise/reporting/mediator.go +++ b/enterprise/reporting/mediator.go @@ -3,8 +3,6 @@ package reporting import ( "context" - "github.com/rudderlabs/rudder-go-kit/stats" - erridx "github.com/rudderlabs/rudder-server/enterprise/reporting/error_index" "github.com/rudderlabs/rudder-go-kit/stats" @@ -33,8 +31,8 @@ func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToke g, ctx := errgroup.WithContext(ctx) rm := &Mediator{ - log: log, - stats: stats.Default, + log: log, + stats: stats.Default, g: g, ctx: ctx, cancel: cancel, From f785db08dbf06301885aa31a1f19ff33d5872213 Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Mon, 30 Oct 2023 20:02:24 -0400 Subject: [PATCH 09/21] fmt file --- enterprise/reporting/reporting.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/enterprise/reporting/reporting.go b/enterprise/reporting/reporting.go index 70acefb493..3b4113d7cf 100644 --- a/enterprise/reporting/reporting.go +++ b/enterprise/reporting/reporting.go @@ -552,8 +552,6 @@ func (r *DefaultReporter) Report(metrics []*types.PUReportedMetric, txn *Tx) err "event_name", "event_type", "error_type", )) - - if err != nil { return fmt.Errorf("preparing statement: %v", err) } From 1c7ecbc527ececb2caddce1354f7b308c735095e Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Tue, 31 Oct 2023 12:42:06 -0400 Subject: [PATCH 10/21] fix tests in reporting --- enterprise/reporting/setup_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/enterprise/reporting/setup_test.go b/enterprise/reporting/setup_test.go index c0e092ef0b..1c23116370 100644 --- a/enterprise/reporting/setup_test.go +++ b/enterprise/reporting/setup_test.go @@ -78,21 +78,21 @@ func TestSetupForDelegates(t *testing.T) { errorReportingEnabled: false, enterpriseTokenExists: true, errorIndexReportingEnabled: false, - expectedDelegates: 2, + expectedDelegates: 1, }, { reportingEnabled: true, errorReportingEnabled: true, enterpriseTokenExists: true, errorIndexReportingEnabled: false, - expectedDelegates: 3, + expectedDelegates: 2, }, { reportingEnabled: true, errorReportingEnabled: true, enterpriseTokenExists: true, errorIndexReportingEnabled: true, - expectedDelegates: 4, + expectedDelegates: 3, }, { reportingEnabled: true, From 8dcef04b2c87a8243796b08498c0d8fc582e0bfa Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Wed, 1 Nov 2023 11:22:23 -0400 Subject: [PATCH 11/21] resolve pr comments --- enterprise/reporting/event_stats.go | 29 +++++++++----------- enterprise/reporting/event_stats_test.go | 34 ++++++++++++++++++++++-- enterprise/reporting/mediator.go | 2 +- enterprise/reporting/reporting.go | 30 ++++++++++++--------- enterprise/reporting/reporting_test.go | 3 ++- 5 files changed, 65 insertions(+), 33 deletions(-) diff --git a/enterprise/reporting/event_stats.go b/enterprise/reporting/event_stats.go index 75dd2315b8..76ff6a8f31 100644 --- a/enterprise/reporting/event_stats.go +++ b/enterprise/reporting/event_stats.go @@ -1,7 +1,6 @@ package reporting import ( - "fmt" "strconv" "github.com/rudderlabs/rudder-go-kit/stats" @@ -25,28 +24,26 @@ func NewEventStatsReporter(configSubscriber *configSubscriber, stats stats.Stats } } -func (es *EventStatsReporter) Report(metrics []*types.PUReportedMetric) error { +func (es *EventStatsReporter) Report(metrics []*types.PUReportedMetric) { // tracking delivery event stats which has state - succeeded, aborted - for _, metric := range metrics { - metric := metric - if !(metric.StatusDetail.Status == "aborted" || metric.StatusDetail.Status == "succeeded") { + for index := range metrics { + if !(metrics[index].StatusDetail.Status == "aborted" || metrics[index].StatusDetail.Status == "succeeded") { continue } tags := stats.Tags{ - "workspaceId": es.configSubscriber.WorkspaceIDFromSource(metric.ConnectionDetails.SourceID), - "sourceId": metric.ConnectionDetails.SourceID, - "destinationId": metric.ConnectionDetails.DestinationID, - "reportedBy": metric.PUDetails.PU, - "sourceCategory": metric.ConnectionDetails.SourceCategory, - "terminal": strconv.FormatBool(metric.PUDetails.TerminalPU), - "status_code": fmt.Sprintf("%d", metric.StatusDetail.StatusCode), - "destinationType": es.configSubscriber.destinationIDMap[metric.ConnectionDetails.DestinationID].destType, + "workspaceId": es.configSubscriber.WorkspaceIDFromSource(metrics[index].ConnectionDetails.SourceID), + "sourceId": metrics[index].ConnectionDetails.SourceID, + "destinationId": metrics[index].ConnectionDetails.DestinationID, + "reportedBy": metrics[index].PUDetails.PU, + "sourceCategory": metrics[index].ConnectionDetails.SourceCategory, + "terminal": strconv.FormatBool(metrics[index].PUDetails.TerminalPU), + "status_code": strconv.Itoa(metrics[index].StatusDetail.StatusCode), + "destinationType": es.configSubscriber.GetDestDetail(metrics[index].ConnectionDetails.DestinationID).destType, } metricName := EventsDeliveredMetricName - if metric.StatusDetail.Status == "aborted" { + if metrics[index].StatusDetail.Status == "aborted" { metricName = EventsAbortedMetricName } - es.stats.NewTaggedStat(metricName, stats.CountType, tags).Count(int(metric.StatusDetail.Count)) + es.stats.NewTaggedStat(metricName, stats.CountType, tags).Count(int(metrics[index].StatusDetail.Count)) } - return nil } diff --git a/enterprise/reporting/event_stats_test.go b/enterprise/reporting/event_stats_test.go index 8a3c212481..c2642a2b03 100644 --- a/enterprise/reporting/event_stats_test.go +++ b/enterprise/reporting/event_stats_test.go @@ -101,10 +101,25 @@ func TestEventStatsReporter(t *testing.T) { StatusCode: 500, }, }, + { + ConnectionDetails: types.ConnectionDetails{ + SourceID: "im-not-there", + DestinationID: destinationID, + SourceCategory: sourceCategory, + }, + PUDetails: types.PUDetails{ + PU: reportedBy, + TerminalPU: false, + }, + StatusDetail: &types.StatusDetail{ + Count: 100, + Status: "failed", + StatusCode: 500, + }, + }, } esr := NewEventStatsReporter(cs, statsStore) - err := esr.Report(testReports) - require.NoError(t, err) + esr.Report(testReports) require.Equal(t, statsStore.Get(EventsDeliveredMetricName, map[string]string{ "workspaceId": workspaceID, "sourceId": sourceID, @@ -125,4 +140,19 @@ func TestEventStatsReporter(t *testing.T) { "status_code": "500", "destinationType": "test-destination-name", }).LastValue(), float64(10)) + require.Equal(t, statsStore.Get(EventsDeliveredMetricName, map[string]string{ + "workspaceId": workspaceID, + "sourceId": sourceID, + "destinationId": destinationID, + "reportedBy": reportedBy, + "sourceCategory": sourceCategory, + "terminal": "true", + "status_code": "200", + "destinationType": "test-destination-name", + }).LastValue(), float64(10)) // last value should be 10 because we are not reporting failed events + + t.Cleanup(func() { + cancel() + <-subscribeDone + }) } diff --git a/enterprise/reporting/mediator.go b/enterprise/reporting/mediator.go index 6de214c111..2b110d830f 100644 --- a/enterprise/reporting/mediator.go +++ b/enterprise/reporting/mediator.go @@ -50,7 +50,7 @@ func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToke }) // default reporting implementation - defaultReporter := NewDefaultReporter(rm.ctx, rm.log, configSubscriber) + defaultReporter := NewDefaultReporter(rm.ctx, rm.log, configSubscriber, rm.stats) rm.reporters = append(rm.reporters, defaultReporter) // error reporting implementation diff --git a/enterprise/reporting/reporting.go b/enterprise/reporting/reporting.go index 3b4113d7cf..19501541a5 100644 --- a/enterprise/reporting/reporting.go +++ b/enterprise/reporting/reporting.go @@ -69,9 +69,11 @@ type DefaultReporter struct { getMinReportedAtQueryTime stats.Measurement getReportsQueryTime stats.Measurement requestLatency stats.Measurement + statsReporter *EventStatsReporter + stats stats.Stats } -func NewDefaultReporter(ctx context.Context, log logger.Logger, configSubscriber *configSubscriber) *DefaultReporter { +func NewDefaultReporter(ctx context.Context, log logger.Logger, configSubscriber *configSubscriber, stats stats.Stats) *DefaultReporter { var dbQueryTimeout *config.Reloadable[time.Duration] reportingServiceURL := config.GetString("REPORTING_URL", "https://reporting.rudderstack.com/") @@ -108,6 +110,8 @@ func NewDefaultReporter(ctx context.Context, log logger.Logger, configSubscriber maxOpenConnections: maxOpenConnections, maxConcurrentRequests: maxConcurrentRequests, dbQueryTimeout: dbQueryTimeout, + statsReporter: NewEventStatsReporter(configSubscriber, stats), + stats: stats, } } @@ -330,16 +334,16 @@ func (r *DefaultReporter) mainLoop(ctx context.Context, c types.SyncerConfig) { tr := &http.Transport{} netClient := &http.Client{Transport: tr, Timeout: config.GetDuration("HttpClient.reporting.timeout", 60, time.Second)} tags := r.getTags(c.Label) - mainLoopTimer := stats.Default.NewTaggedStat(StatReportingMainLoopTime, stats.TimerType, tags) - getReportsTimer := stats.Default.NewTaggedStat(StatReportingGetReportsTime, stats.TimerType, tags) - getReportsCount := stats.Default.NewTaggedStat(StatReportingGetReportsCount, stats.HistogramType, tags) - getAggregatedReportsTimer := stats.Default.NewTaggedStat(StatReportingGetAggregatedReportsTime, stats.TimerType, tags) - getAggregatedReportsCount := stats.Default.NewTaggedStat(StatReportingGetAggregatedReportsCount, stats.HistogramType, tags) - - r.getMinReportedAtQueryTime = stats.Default.NewTaggedStat(StatReportingGetMinReportedAtQueryTime, stats.TimerType, tags) - r.getReportsQueryTime = stats.Default.NewTaggedStat(StatReportingGetReportsQueryTime, stats.TimerType, tags) - r.requestLatency = stats.Default.NewTaggedStat(StatReportingHttpReqLatency, stats.TimerType, tags) - reportingLag := stats.Default.NewTaggedStat( + mainLoopTimer := r.stats.NewTaggedStat(StatReportingMainLoopTime, stats.TimerType, tags) + getReportsTimer := r.stats.NewTaggedStat(StatReportingGetReportsTime, stats.TimerType, tags) + getReportsCount := r.stats.NewTaggedStat(StatReportingGetReportsCount, stats.HistogramType, tags) + getAggregatedReportsTimer := r.stats.NewTaggedStat(StatReportingGetAggregatedReportsTime, stats.TimerType, tags) + getAggregatedReportsCount := r.stats.NewTaggedStat(StatReportingGetAggregatedReportsCount, stats.HistogramType, tags) + + r.getMinReportedAtQueryTime = r.stats.NewTaggedStat(StatReportingGetMinReportedAtQueryTime, stats.TimerType, tags) + r.getReportsQueryTime = r.stats.NewTaggedStat(StatReportingGetReportsQueryTime, stats.TimerType, tags) + r.requestLatency = r.stats.NewTaggedStat(StatReportingHttpReqLatency, stats.TimerType, tags) + reportingLag := r.stats.NewTaggedStat( "reporting_metrics_lag_seconds", stats.GaugeType, stats.Tags{"client": c.Label}, ) @@ -465,7 +469,7 @@ func (r *DefaultReporter) sendMetric(ctx context.Context, netClient *http.Client r.requestLatency.Since(httpRequestStart) httpStatTags := r.getTags(label) httpStatTags["status"] = strconv.Itoa(resp.StatusCode) - stats.Default.NewTaggedStat(StatReportingHttpReq, stats.CountType, httpStatTags).Count(1) + r.stats.NewTaggedStat(StatReportingHttpReq, stats.CountType, httpStatTags).Count(1) defer func() { httputil.CloseResponse(resp) }() respBody, err := io.ReadAll(resp.Body) @@ -557,7 +561,7 @@ func (r *DefaultReporter) Report(metrics []*types.PUReportedMetric, txn *Tx) err } defer func() { _ = stmt.Close() }() txn.AddSuccessListener(func() { - NewEventStatsReporter(r.configSubscriber, stats.Default).Report(metrics) + r.statsReporter.Report(metrics) }) reportedAt := time.Now().UTC().Unix() / 60 diff --git a/enterprise/reporting/reporting_test.go b/enterprise/reporting/reporting_test.go index 51935b1f55..6d0ef75a94 100644 --- a/enterprise/reporting/reporting_test.go +++ b/enterprise/reporting/reporting_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-server/utils/types" ) @@ -198,7 +199,7 @@ var _ = Describe("Reporting", func() { }, } configSubscriber := newConfigSubscriber(logger.NOP) - reportHandle := NewDefaultReporter(context.Background(), logger.NOP, configSubscriber) + reportHandle := NewDefaultReporter(context.Background(), logger.NOP, configSubscriber, stats.Default) aggregatedMetrics := reportHandle.getAggregatedReports(inputReports) Expect(aggregatedMetrics).To(Equal(expectedResponse)) From 5bde9adeb4690fe8703e58406163936a1113a11a Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Wed, 8 Nov 2023 06:53:22 -0500 Subject: [PATCH 12/21] resolve pr comments --- enterprise/reporting/event_stats.go | 54 ++++++++++++++---------- enterprise/reporting/event_stats_test.go | 20 +++------ enterprise/reporting/mediator.go | 2 + enterprise/reporting/reporting.go | 3 -- enterprise/reporting/reporting_test.go | 4 +- 5 files changed, 41 insertions(+), 42 deletions(-) diff --git a/enterprise/reporting/event_stats.go b/enterprise/reporting/event_stats.go index 76ff6a8f31..29b4cdea29 100644 --- a/enterprise/reporting/event_stats.go +++ b/enterprise/reporting/event_stats.go @@ -4,13 +4,14 @@ import ( "strconv" "github.com/rudderlabs/rudder-go-kit/stats" + . "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck "github.com/rudderlabs/rudder-server/utils/types" ) -const ( - EventsDeliveredMetricName = "events_delivered_total" - EventsAbortedMetricName = "events_aborted_total" -) +var measurementNames map[string]string = map[string]string{ + "succeeded": "events_delivered_total", + "aborted": "events_aborted_total", +} type EventStatsReporter struct { stats stats.Stats @@ -24,26 +25,35 @@ func NewEventStatsReporter(configSubscriber *configSubscriber, stats stats.Stats } } -func (es *EventStatsReporter) Report(metrics []*types.PUReportedMetric) { +func (es *EventStatsReporter) Record(metrics []*types.PUReportedMetric) { // tracking delivery event stats which has state - succeeded, aborted for index := range metrics { - if !(metrics[index].StatusDetail.Status == "aborted" || metrics[index].StatusDetail.Status == "succeeded") { - continue - } - tags := stats.Tags{ - "workspaceId": es.configSubscriber.WorkspaceIDFromSource(metrics[index].ConnectionDetails.SourceID), - "sourceId": metrics[index].ConnectionDetails.SourceID, - "destinationId": metrics[index].ConnectionDetails.DestinationID, - "reportedBy": metrics[index].PUDetails.PU, - "sourceCategory": metrics[index].ConnectionDetails.SourceCategory, - "terminal": strconv.FormatBool(metrics[index].PUDetails.TerminalPU), - "status_code": strconv.Itoa(metrics[index].StatusDetail.StatusCode), - "destinationType": es.configSubscriber.GetDestDetail(metrics[index].ConnectionDetails.DestinationID).destType, + if measurement, ok := measurementNames[metrics[index].StatusDetail.Status]; ok { + tags := stats.Tags{ + "workspaceId": es.configSubscriber.WorkspaceIDFromSource(metrics[index].ConnectionDetails.SourceID), + "sourceId": metrics[index].ConnectionDetails.SourceID, + "destinationId": metrics[index].ConnectionDetails.DestinationID, + "reportedBy": metrics[index].PUDetails.PU, + "sourceCategory": metrics[index].ConnectionDetails.SourceCategory, + "terminal": strconv.FormatBool(metrics[index].PUDetails.TerminalPU), + "status_code": strconv.Itoa(metrics[index].StatusDetail.StatusCode), + "destinationType": es.configSubscriber.GetDestDetail(metrics[index].ConnectionDetails.DestinationID).destType, + } + es.stats.NewTaggedStat(measurement, stats.CountType, tags).Count(int(metrics[index].StatusDetail.Count)) } - metricName := EventsDeliveredMetricName - if metrics[index].StatusDetail.Status == "aborted" { - metricName = EventsAbortedMetricName - } - es.stats.NewTaggedStat(metricName, stats.CountType, tags).Count(int(metrics[index].StatusDetail.Count)) } } + +func (es *EventStatsReporter) Report(metrics []*types.PUReportedMetric, tx *Tx) error { + tx.AddSuccessListener(func() { + es.Record(metrics) + }) + return nil +} + +func (es *EventStatsReporter) Stop() { +} + +func (es *EventStatsReporter) DatabaseSyncer(c types.SyncerConfig) types.ReportingSyncer { + return nil +} diff --git a/enterprise/reporting/event_stats_test.go b/enterprise/reporting/event_stats_test.go index c2642a2b03..43ac545c68 100644 --- a/enterprise/reporting/event_stats_test.go +++ b/enterprise/reporting/event_stats_test.go @@ -96,7 +96,7 @@ func TestEventStatsReporter(t *testing.T) { TerminalPU: false, }, StatusDetail: &types.StatusDetail{ - Count: 10, + Count: 50, Status: "aborted", StatusCode: 500, }, @@ -119,8 +119,8 @@ func TestEventStatsReporter(t *testing.T) { }, } esr := NewEventStatsReporter(cs, statsStore) - esr.Report(testReports) - require.Equal(t, statsStore.Get(EventsDeliveredMetricName, map[string]string{ + esr.Record(testReports) + require.Equal(t, statsStore.Get(measurementNames["succeeded"], map[string]string{ "workspaceId": workspaceID, "sourceId": sourceID, "destinationId": destinationID, @@ -130,7 +130,7 @@ func TestEventStatsReporter(t *testing.T) { "status_code": "200", "destinationType": "test-destination-name", }).LastValue(), float64(10)) - require.Equal(t, statsStore.Get(EventsAbortedMetricName, map[string]string{ + require.Equal(t, statsStore.Get(measurementNames["aborted"], map[string]string{ "workspaceId": workspaceID, "sourceId": sourceID, "destinationId": destinationID, @@ -139,17 +139,7 @@ func TestEventStatsReporter(t *testing.T) { "terminal": "false", "status_code": "500", "destinationType": "test-destination-name", - }).LastValue(), float64(10)) - require.Equal(t, statsStore.Get(EventsDeliveredMetricName, map[string]string{ - "workspaceId": workspaceID, - "sourceId": sourceID, - "destinationId": destinationID, - "reportedBy": reportedBy, - "sourceCategory": sourceCategory, - "terminal": "true", - "status_code": "200", - "destinationType": "test-destination-name", - }).LastValue(), float64(10)) // last value should be 10 because we are not reporting failed events + }).LastValue(), float64(15)) t.Cleanup(func() { cancel() diff --git a/enterprise/reporting/mediator.go b/enterprise/reporting/mediator.go index 2b110d830f..5ec59c2e7a 100644 --- a/enterprise/reporting/mediator.go +++ b/enterprise/reporting/mediator.go @@ -64,6 +64,8 @@ func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToke errorIndexReporter := erridx.NewErrorIndexReporter(rm.ctx, rm.log, configSubscriber, config.Default, stats.Default) rm.reporters = append(rm.reporters, errorIndexReporter) } + eventStatsReporter := NewEventStatsReporter(configSubscriber, rm.stats) + rm.reporters = append(rm.reporters, eventStatsReporter) return rm } diff --git a/enterprise/reporting/reporting.go b/enterprise/reporting/reporting.go index 19501541a5..2312af5f55 100644 --- a/enterprise/reporting/reporting.go +++ b/enterprise/reporting/reporting.go @@ -560,9 +560,6 @@ func (r *DefaultReporter) Report(metrics []*types.PUReportedMetric, txn *Tx) err return fmt.Errorf("preparing statement: %v", err) } defer func() { _ = stmt.Close() }() - txn.AddSuccessListener(func() { - r.statsReporter.Report(metrics) - }) reportedAt := time.Now().UTC().Unix() / 60 for _, metric := range metrics { diff --git a/enterprise/reporting/reporting_test.go b/enterprise/reporting/reporting_test.go index 6d0ef75a94..04fedebcc3 100644 --- a/enterprise/reporting/reporting_test.go +++ b/enterprise/reporting/reporting_test.go @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-go-kit/stats/memstats" "github.com/rudderlabs/rudder-server/utils/types" ) @@ -199,7 +199,7 @@ var _ = Describe("Reporting", func() { }, } configSubscriber := newConfigSubscriber(logger.NOP) - reportHandle := NewDefaultReporter(context.Background(), logger.NOP, configSubscriber, stats.Default) + reportHandle := NewDefaultReporter(context.Background(), logger.NOP, configSubscriber, memstats.New()) aggregatedMetrics := reportHandle.getAggregatedReports(inputReports) Expect(aggregatedMetrics).To(Equal(expectedResponse)) From 68232c4f1fa582fc708c06a031f72772d31df9c8 Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Wed, 8 Nov 2023 06:55:11 -0500 Subject: [PATCH 13/21] fix count in aborted test --- enterprise/reporting/event_stats_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/enterprise/reporting/event_stats_test.go b/enterprise/reporting/event_stats_test.go index 43ac545c68..48f58d8760 100644 --- a/enterprise/reporting/event_stats_test.go +++ b/enterprise/reporting/event_stats_test.go @@ -139,7 +139,7 @@ func TestEventStatsReporter(t *testing.T) { "terminal": "false", "status_code": "500", "destinationType": "test-destination-name", - }).LastValue(), float64(15)) + }).LastValue(), float64(50)) t.Cleanup(func() { cancel() From 582628f81a4d50ca3d8b5f94d60d999f30feb0ea Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Wed, 8 Nov 2023 07:06:27 -0500 Subject: [PATCH 14/21] fix database syncer return --- enterprise/reporting/event_stats.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/enterprise/reporting/event_stats.go b/enterprise/reporting/event_stats.go index 29b4cdea29..0df790f1ab 100644 --- a/enterprise/reporting/event_stats.go +++ b/enterprise/reporting/event_stats.go @@ -55,5 +55,5 @@ func (es *EventStatsReporter) Stop() { } func (es *EventStatsReporter) DatabaseSyncer(c types.SyncerConfig) types.ReportingSyncer { - return nil + return func() {} } From 08bb1321817eb141f7cc285846d47ecd3baa6ce1 Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Wed, 8 Nov 2023 07:29:11 -0500 Subject: [PATCH 15/21] fix delegate setup test --- enterprise/reporting/setup_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/enterprise/reporting/setup_test.go b/enterprise/reporting/setup_test.go index 1c23116370..c0e092ef0b 100644 --- a/enterprise/reporting/setup_test.go +++ b/enterprise/reporting/setup_test.go @@ -78,21 +78,21 @@ func TestSetupForDelegates(t *testing.T) { errorReportingEnabled: false, enterpriseTokenExists: true, errorIndexReportingEnabled: false, - expectedDelegates: 1, + expectedDelegates: 2, }, { reportingEnabled: true, errorReportingEnabled: true, enterpriseTokenExists: true, errorIndexReportingEnabled: false, - expectedDelegates: 2, + expectedDelegates: 3, }, { reportingEnabled: true, errorReportingEnabled: true, enterpriseTokenExists: true, errorIndexReportingEnabled: true, - expectedDelegates: 3, + expectedDelegates: 4, }, { reportingEnabled: true, From 4564996efab596e40b4a558ff32936d93c9867e3 Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Wed, 8 Nov 2023 09:55:23 -0500 Subject: [PATCH 16/21] add status as label and capture only one metric --- enterprise/reporting/event_stats.go | 31 ++++++++++-------------- enterprise/reporting/event_stats_test.go | 22 +++-------------- enterprise/reporting/reporting.go | 2 -- 3 files changed, 17 insertions(+), 38 deletions(-) diff --git a/enterprise/reporting/event_stats.go b/enterprise/reporting/event_stats.go index 0df790f1ab..63b5a4c9b6 100644 --- a/enterprise/reporting/event_stats.go +++ b/enterprise/reporting/event_stats.go @@ -8,11 +8,6 @@ import ( "github.com/rudderlabs/rudder-server/utils/types" ) -var measurementNames map[string]string = map[string]string{ - "succeeded": "events_delivered_total", - "aborted": "events_aborted_total", -} - type EventStatsReporter struct { stats stats.Stats configSubscriber *configSubscriber @@ -25,22 +20,22 @@ func NewEventStatsReporter(configSubscriber *configSubscriber, stats stats.Stats } } +const EventsProcessedMetricName = "events_processed_total" + func (es *EventStatsReporter) Record(metrics []*types.PUReportedMetric) { - // tracking delivery event stats which has state - succeeded, aborted for index := range metrics { - if measurement, ok := measurementNames[metrics[index].StatusDetail.Status]; ok { - tags := stats.Tags{ - "workspaceId": es.configSubscriber.WorkspaceIDFromSource(metrics[index].ConnectionDetails.SourceID), - "sourceId": metrics[index].ConnectionDetails.SourceID, - "destinationId": metrics[index].ConnectionDetails.DestinationID, - "reportedBy": metrics[index].PUDetails.PU, - "sourceCategory": metrics[index].ConnectionDetails.SourceCategory, - "terminal": strconv.FormatBool(metrics[index].PUDetails.TerminalPU), - "status_code": strconv.Itoa(metrics[index].StatusDetail.StatusCode), - "destinationType": es.configSubscriber.GetDestDetail(metrics[index].ConnectionDetails.DestinationID).destType, - } - es.stats.NewTaggedStat(measurement, stats.CountType, tags).Count(int(metrics[index].StatusDetail.Count)) + tags := stats.Tags{ + "workspaceId": es.configSubscriber.WorkspaceIDFromSource(metrics[index].ConnectionDetails.SourceID), + "sourceId": metrics[index].ConnectionDetails.SourceID, + "destinationId": metrics[index].ConnectionDetails.DestinationID, + "reportedBy": metrics[index].PUDetails.PU, + "sourceCategory": metrics[index].ConnectionDetails.SourceCategory, + "terminal": strconv.FormatBool(metrics[index].PUDetails.TerminalPU), + "status_code": strconv.Itoa(metrics[index].StatusDetail.StatusCode), + "destinationType": es.configSubscriber.GetDestDetail(metrics[index].ConnectionDetails.DestinationID).destType, + "status": metrics[index].StatusDetail.Status, } + es.stats.NewTaggedStat(EventsProcessedMetricName, stats.CountType, tags).Count(int(metrics[index].StatusDetail.Count)) } } diff --git a/enterprise/reporting/event_stats_test.go b/enterprise/reporting/event_stats_test.go index 48f58d8760..259b7d54c0 100644 --- a/enterprise/reporting/event_stats_test.go +++ b/enterprise/reporting/event_stats_test.go @@ -101,26 +101,10 @@ func TestEventStatsReporter(t *testing.T) { StatusCode: 500, }, }, - { - ConnectionDetails: types.ConnectionDetails{ - SourceID: "im-not-there", - DestinationID: destinationID, - SourceCategory: sourceCategory, - }, - PUDetails: types.PUDetails{ - PU: reportedBy, - TerminalPU: false, - }, - StatusDetail: &types.StatusDetail{ - Count: 100, - Status: "failed", - StatusCode: 500, - }, - }, } esr := NewEventStatsReporter(cs, statsStore) esr.Record(testReports) - require.Equal(t, statsStore.Get(measurementNames["succeeded"], map[string]string{ + require.Equal(t, statsStore.Get(EventsProcessedMetricName, map[string]string{ "workspaceId": workspaceID, "sourceId": sourceID, "destinationId": destinationID, @@ -129,8 +113,9 @@ func TestEventStatsReporter(t *testing.T) { "terminal": "true", "status_code": "200", "destinationType": "test-destination-name", + "status": "succeeded", }).LastValue(), float64(10)) - require.Equal(t, statsStore.Get(measurementNames["aborted"], map[string]string{ + require.Equal(t, statsStore.Get(EventsProcessedMetricName, map[string]string{ "workspaceId": workspaceID, "sourceId": sourceID, "destinationId": destinationID, @@ -139,6 +124,7 @@ func TestEventStatsReporter(t *testing.T) { "terminal": "false", "status_code": "500", "destinationType": "test-destination-name", + "status": "aborted", }).LastValue(), float64(50)) t.Cleanup(func() { diff --git a/enterprise/reporting/reporting.go b/enterprise/reporting/reporting.go index 2312af5f55..ece23bed13 100644 --- a/enterprise/reporting/reporting.go +++ b/enterprise/reporting/reporting.go @@ -69,7 +69,6 @@ type DefaultReporter struct { getMinReportedAtQueryTime stats.Measurement getReportsQueryTime stats.Measurement requestLatency stats.Measurement - statsReporter *EventStatsReporter stats stats.Stats } @@ -110,7 +109,6 @@ func NewDefaultReporter(ctx context.Context, log logger.Logger, configSubscriber maxOpenConnections: maxOpenConnections, maxConcurrentRequests: maxConcurrentRequests, dbQueryTimeout: dbQueryTimeout, - statsReporter: NewEventStatsReporter(configSubscriber, stats), stats: stats, } } From b9c99a53742dba3c450687bc89e48ba0b18cc304 Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Thu, 9 Nov 2023 08:32:15 -0500 Subject: [PATCH 17/21] remove terminal in labels and only capture terminal status except migrated --- enterprise/reporting/event_stats.go | 23 +++++----- enterprise/reporting/event_stats_test.go | 56 ++++++++++++++++++++++-- 2 files changed, 65 insertions(+), 14 deletions(-) diff --git a/enterprise/reporting/event_stats.go b/enterprise/reporting/event_stats.go index 63b5a4c9b6..bc62bd0f42 100644 --- a/enterprise/reporting/event_stats.go +++ b/enterprise/reporting/event_stats.go @@ -24,18 +24,19 @@ const EventsProcessedMetricName = "events_processed_total" func (es *EventStatsReporter) Record(metrics []*types.PUReportedMetric) { for index := range metrics { - tags := stats.Tags{ - "workspaceId": es.configSubscriber.WorkspaceIDFromSource(metrics[index].ConnectionDetails.SourceID), - "sourceId": metrics[index].ConnectionDetails.SourceID, - "destinationId": metrics[index].ConnectionDetails.DestinationID, - "reportedBy": metrics[index].PUDetails.PU, - "sourceCategory": metrics[index].ConnectionDetails.SourceCategory, - "terminal": strconv.FormatBool(metrics[index].PUDetails.TerminalPU), - "status_code": strconv.Itoa(metrics[index].StatusDetail.StatusCode), - "destinationType": es.configSubscriber.GetDestDetail(metrics[index].ConnectionDetails.DestinationID).destType, - "status": metrics[index].StatusDetail.Status, + if metrics[index].PUDetails.TerminalPU == true && metrics[index].StatusDetail.Status != "migrated" { + tags := stats.Tags{ + "workspaceId": es.configSubscriber.WorkspaceIDFromSource(metrics[index].ConnectionDetails.SourceID), + "sourceId": metrics[index].ConnectionDetails.SourceID, + "destinationId": metrics[index].ConnectionDetails.DestinationID, + "reportedBy": metrics[index].PUDetails.PU, + "sourceCategory": metrics[index].ConnectionDetails.SourceCategory, + "status_code": strconv.Itoa(metrics[index].StatusDetail.StatusCode), + "destinationType": es.configSubscriber.GetDestDetail(metrics[index].ConnectionDetails.DestinationID).destType, + "status": metrics[index].StatusDetail.Status, + } + es.stats.NewTaggedStat(EventsProcessedMetricName, stats.CountType, tags).Count(int(metrics[index].StatusDetail.Count)) } - es.stats.NewTaggedStat(EventsProcessedMetricName, stats.CountType, tags).Count(int(metrics[index].StatusDetail.Count)) } } diff --git a/enterprise/reporting/event_stats_test.go b/enterprise/reporting/event_stats_test.go index 259b7d54c0..f377449659 100644 --- a/enterprise/reporting/event_stats_test.go +++ b/enterprise/reporting/event_stats_test.go @@ -93,7 +93,7 @@ func TestEventStatsReporter(t *testing.T) { }, PUDetails: types.PUDetails{ PU: reportedBy, - TerminalPU: false, + TerminalPU: true, }, StatusDetail: &types.StatusDetail{ Count: 50, @@ -101,6 +101,38 @@ func TestEventStatsReporter(t *testing.T) { StatusCode: 500, }, }, + { + ConnectionDetails: types.ConnectionDetails{ + SourceID: sourceID, + DestinationID: destinationID, + SourceCategory: sourceCategory, + }, + PUDetails: types.PUDetails{ + PU: reportedBy, + TerminalPU: true, + }, + StatusDetail: &types.StatusDetail{ + Count: 50, + Status: "migrated", + StatusCode: 500, + }, + }, + { + ConnectionDetails: types.ConnectionDetails{ + SourceID: sourceID, + DestinationID: destinationID, + SourceCategory: sourceCategory, + }, + PUDetails: types.PUDetails{ + PU: reportedBy, + TerminalPU: false, + }, + StatusDetail: &types.StatusDetail{ + Count: 50, + Status: "non-terminal", + StatusCode: 500, + }, + }, } esr := NewEventStatsReporter(cs, statsStore) esr.Record(testReports) @@ -110,7 +142,6 @@ func TestEventStatsReporter(t *testing.T) { "destinationId": destinationID, "reportedBy": reportedBy, "sourceCategory": sourceCategory, - "terminal": "true", "status_code": "200", "destinationType": "test-destination-name", "status": "succeeded", @@ -121,11 +152,30 @@ func TestEventStatsReporter(t *testing.T) { "destinationId": destinationID, "reportedBy": reportedBy, "sourceCategory": sourceCategory, - "terminal": "false", "status_code": "500", "destinationType": "test-destination-name", "status": "aborted", }).LastValue(), float64(50)) + require.Equal(t, statsStore.Get(EventsProcessedMetricName, map[string]string{ + "workspaceId": workspaceID, + "sourceId": sourceID, + "destinationId": destinationID, + "reportedBy": reportedBy, + "sourceCategory": sourceCategory, + "status_code": "500", + "destinationType": "test-destination-name", + "status": "migrated", + }), nil) + require.Equal(t, statsStore.Get(EventsProcessedMetricName, map[string]string{ + "workspaceId": workspaceID, + "sourceId": sourceID, + "destinationId": destinationID, + "reportedBy": reportedBy, + "sourceCategory": sourceCategory, + "status_code": "500", + "destinationType": "test-destination-name", + "status": "non-terminal", + }), nil) t.Cleanup(func() { cancel() From 061947370263a069b91e7bbbd65695a10bc616c4 Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Thu, 9 Nov 2023 09:03:27 -0500 Subject: [PATCH 18/21] fix test in event stats --- enterprise/reporting/event_stats_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/enterprise/reporting/event_stats_test.go b/enterprise/reporting/event_stats_test.go index f377449659..cdd0e64435 100644 --- a/enterprise/reporting/event_stats_test.go +++ b/enterprise/reporting/event_stats_test.go @@ -156,7 +156,7 @@ func TestEventStatsReporter(t *testing.T) { "destinationType": "test-destination-name", "status": "aborted", }).LastValue(), float64(50)) - require.Equal(t, statsStore.Get(EventsProcessedMetricName, map[string]string{ + require.Empty(t, statsStore.Get(EventsProcessedMetricName, map[string]string{ "workspaceId": workspaceID, "sourceId": sourceID, "destinationId": destinationID, @@ -165,8 +165,8 @@ func TestEventStatsReporter(t *testing.T) { "status_code": "500", "destinationType": "test-destination-name", "status": "migrated", - }), nil) - require.Equal(t, statsStore.Get(EventsProcessedMetricName, map[string]string{ + })) + require.Empty(t, statsStore.Get(EventsProcessedMetricName, map[string]string{ "workspaceId": workspaceID, "sourceId": sourceID, "destinationId": destinationID, @@ -175,7 +175,7 @@ func TestEventStatsReporter(t *testing.T) { "status_code": "500", "destinationType": "test-destination-name", "status": "non-terminal", - }), nil) + })) t.Cleanup(func() { cancel() From d3d5a2f08fa81cce4d9eafbda50e4c764cce6271 Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Thu, 9 Nov 2023 09:36:33 -0500 Subject: [PATCH 19/21] nit pick case and constants --- enterprise/reporting/event_stats.go | 5 +++-- enterprise/reporting/event_stats_test.go | 21 +++++++++++---------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/enterprise/reporting/event_stats.go b/enterprise/reporting/event_stats.go index bc62bd0f42..a75e3d1cfc 100644 --- a/enterprise/reporting/event_stats.go +++ b/enterprise/reporting/event_stats.go @@ -4,6 +4,7 @@ import ( "strconv" "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-server/jobsdb" . "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck "github.com/rudderlabs/rudder-server/utils/types" ) @@ -24,14 +25,14 @@ const EventsProcessedMetricName = "events_processed_total" func (es *EventStatsReporter) Record(metrics []*types.PUReportedMetric) { for index := range metrics { - if metrics[index].PUDetails.TerminalPU == true && metrics[index].StatusDetail.Status != "migrated" { + if metrics[index].PUDetails.TerminalPU == true && metrics[index].StatusDetail.Status != jobsdb.Migrated.State { tags := stats.Tags{ "workspaceId": es.configSubscriber.WorkspaceIDFromSource(metrics[index].ConnectionDetails.SourceID), "sourceId": metrics[index].ConnectionDetails.SourceID, "destinationId": metrics[index].ConnectionDetails.DestinationID, "reportedBy": metrics[index].PUDetails.PU, "sourceCategory": metrics[index].ConnectionDetails.SourceCategory, - "status_code": strconv.Itoa(metrics[index].StatusDetail.StatusCode), + "statusCode": strconv.Itoa(metrics[index].StatusDetail.StatusCode), "destinationType": es.configSubscriber.GetDestDetail(metrics[index].ConnectionDetails.DestinationID).destType, "status": metrics[index].StatusDetail.Status, } diff --git a/enterprise/reporting/event_stats_test.go b/enterprise/reporting/event_stats_test.go index cdd0e64435..85f7f2aed0 100644 --- a/enterprise/reporting/event_stats_test.go +++ b/enterprise/reporting/event_stats_test.go @@ -10,6 +10,7 @@ import ( "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats/memstats" backendconfig "github.com/rudderlabs/rudder-server/backend-config" + "github.com/rudderlabs/rudder-server/jobsdb" mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/backend-config" "github.com/rudderlabs/rudder-server/utils/pubsub" "github.com/rudderlabs/rudder-server/utils/types" @@ -81,7 +82,7 @@ func TestEventStatsReporter(t *testing.T) { }, StatusDetail: &types.StatusDetail{ Count: 10, - Status: "succeeded", + Status: jobsdb.Succeeded.State, StatusCode: 200, }, }, @@ -97,7 +98,7 @@ func TestEventStatsReporter(t *testing.T) { }, StatusDetail: &types.StatusDetail{ Count: 50, - Status: "aborted", + Status: jobsdb.Aborted.State, StatusCode: 500, }, }, @@ -113,7 +114,7 @@ func TestEventStatsReporter(t *testing.T) { }, StatusDetail: &types.StatusDetail{ Count: 50, - Status: "migrated", + Status: jobsdb.Migrated.State, StatusCode: 500, }, }, @@ -142,9 +143,9 @@ func TestEventStatsReporter(t *testing.T) { "destinationId": destinationID, "reportedBy": reportedBy, "sourceCategory": sourceCategory, - "status_code": "200", + "statusCode": "200", "destinationType": "test-destination-name", - "status": "succeeded", + "status": jobsdb.Succeeded.State, }).LastValue(), float64(10)) require.Equal(t, statsStore.Get(EventsProcessedMetricName, map[string]string{ "workspaceId": workspaceID, @@ -152,9 +153,9 @@ func TestEventStatsReporter(t *testing.T) { "destinationId": destinationID, "reportedBy": reportedBy, "sourceCategory": sourceCategory, - "status_code": "500", + "statusCode": "500", "destinationType": "test-destination-name", - "status": "aborted", + "status": jobsdb.Aborted.State, }).LastValue(), float64(50)) require.Empty(t, statsStore.Get(EventsProcessedMetricName, map[string]string{ "workspaceId": workspaceID, @@ -162,9 +163,9 @@ func TestEventStatsReporter(t *testing.T) { "destinationId": destinationID, "reportedBy": reportedBy, "sourceCategory": sourceCategory, - "status_code": "500", + "statusCode": "500", "destinationType": "test-destination-name", - "status": "migrated", + "status": jobsdb.Migrated.State, })) require.Empty(t, statsStore.Get(EventsProcessedMetricName, map[string]string{ "workspaceId": workspaceID, @@ -172,7 +173,7 @@ func TestEventStatsReporter(t *testing.T) { "destinationId": destinationID, "reportedBy": reportedBy, "sourceCategory": sourceCategory, - "status_code": "500", + "statusCode": "500", "destinationType": "test-destination-name", "status": "non-terminal", })) From b4589e75d338b1ad8d42d2b98ca170832d20a088 Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Thu, 9 Nov 2023 09:54:16 -0500 Subject: [PATCH 20/21] add terminal in metric label --- enterprise/reporting/event_stats.go | 3 ++- enterprise/reporting/event_stats_test.go | 10 +++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/enterprise/reporting/event_stats.go b/enterprise/reporting/event_stats.go index a75e3d1cfc..82dcba3a0c 100644 --- a/enterprise/reporting/event_stats.go +++ b/enterprise/reporting/event_stats.go @@ -25,7 +25,7 @@ const EventsProcessedMetricName = "events_processed_total" func (es *EventStatsReporter) Record(metrics []*types.PUReportedMetric) { for index := range metrics { - if metrics[index].PUDetails.TerminalPU == true && metrics[index].StatusDetail.Status != jobsdb.Migrated.State { + if metrics[index].StatusDetail.Status != jobsdb.Migrated.State { tags := stats.Tags{ "workspaceId": es.configSubscriber.WorkspaceIDFromSource(metrics[index].ConnectionDetails.SourceID), "sourceId": metrics[index].ConnectionDetails.SourceID, @@ -33,6 +33,7 @@ func (es *EventStatsReporter) Record(metrics []*types.PUReportedMetric) { "reportedBy": metrics[index].PUDetails.PU, "sourceCategory": metrics[index].ConnectionDetails.SourceCategory, "statusCode": strconv.Itoa(metrics[index].StatusDetail.StatusCode), + "terminal": strconv.FormatBool(metrics[index].PUDetails.TerminalPU), "destinationType": es.configSubscriber.GetDestDetail(metrics[index].ConnectionDetails.DestinationID).destType, "status": metrics[index].StatusDetail.Status, } diff --git a/enterprise/reporting/event_stats_test.go b/enterprise/reporting/event_stats_test.go index 85f7f2aed0..b13e657a54 100644 --- a/enterprise/reporting/event_stats_test.go +++ b/enterprise/reporting/event_stats_test.go @@ -129,7 +129,7 @@ func TestEventStatsReporter(t *testing.T) { TerminalPU: false, }, StatusDetail: &types.StatusDetail{ - Count: 50, + Count: 100, Status: "non-terminal", StatusCode: 500, }, @@ -145,6 +145,7 @@ func TestEventStatsReporter(t *testing.T) { "sourceCategory": sourceCategory, "statusCode": "200", "destinationType": "test-destination-name", + "terminal": "true", "status": jobsdb.Succeeded.State, }).LastValue(), float64(10)) require.Equal(t, statsStore.Get(EventsProcessedMetricName, map[string]string{ @@ -155,6 +156,7 @@ func TestEventStatsReporter(t *testing.T) { "sourceCategory": sourceCategory, "statusCode": "500", "destinationType": "test-destination-name", + "terminal": "true", "status": jobsdb.Aborted.State, }).LastValue(), float64(50)) require.Empty(t, statsStore.Get(EventsProcessedMetricName, map[string]string{ @@ -165,9 +167,10 @@ func TestEventStatsReporter(t *testing.T) { "sourceCategory": sourceCategory, "statusCode": "500", "destinationType": "test-destination-name", + "terminal": "true", "status": jobsdb.Migrated.State, })) - require.Empty(t, statsStore.Get(EventsProcessedMetricName, map[string]string{ + require.Equal(t, statsStore.Get(EventsProcessedMetricName, map[string]string{ "workspaceId": workspaceID, "sourceId": sourceID, "destinationId": destinationID, @@ -175,8 +178,9 @@ func TestEventStatsReporter(t *testing.T) { "sourceCategory": sourceCategory, "statusCode": "500", "destinationType": "test-destination-name", + "terminal": "false", "status": "non-terminal", - })) + }).LastValue(), float64(100)) t.Cleanup(func() { cancel() From 1bcc5fdb27c395a99a2973500dafe32bba6fc10b Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Thu, 9 Nov 2023 09:57:12 -0500 Subject: [PATCH 21/21] add migrated status back --- enterprise/reporting/event_stats.go | 25 +++++++++++------------- enterprise/reporting/event_stats_test.go | 8 ++++---- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/enterprise/reporting/event_stats.go b/enterprise/reporting/event_stats.go index 82dcba3a0c..074ce880d2 100644 --- a/enterprise/reporting/event_stats.go +++ b/enterprise/reporting/event_stats.go @@ -4,7 +4,6 @@ import ( "strconv" "github.com/rudderlabs/rudder-go-kit/stats" - "github.com/rudderlabs/rudder-server/jobsdb" . "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck "github.com/rudderlabs/rudder-server/utils/types" ) @@ -25,20 +24,18 @@ const EventsProcessedMetricName = "events_processed_total" func (es *EventStatsReporter) Record(metrics []*types.PUReportedMetric) { for index := range metrics { - if metrics[index].StatusDetail.Status != jobsdb.Migrated.State { - tags := stats.Tags{ - "workspaceId": es.configSubscriber.WorkspaceIDFromSource(metrics[index].ConnectionDetails.SourceID), - "sourceId": metrics[index].ConnectionDetails.SourceID, - "destinationId": metrics[index].ConnectionDetails.DestinationID, - "reportedBy": metrics[index].PUDetails.PU, - "sourceCategory": metrics[index].ConnectionDetails.SourceCategory, - "statusCode": strconv.Itoa(metrics[index].StatusDetail.StatusCode), - "terminal": strconv.FormatBool(metrics[index].PUDetails.TerminalPU), - "destinationType": es.configSubscriber.GetDestDetail(metrics[index].ConnectionDetails.DestinationID).destType, - "status": metrics[index].StatusDetail.Status, - } - es.stats.NewTaggedStat(EventsProcessedMetricName, stats.CountType, tags).Count(int(metrics[index].StatusDetail.Count)) + tags := stats.Tags{ + "workspaceId": es.configSubscriber.WorkspaceIDFromSource(metrics[index].ConnectionDetails.SourceID), + "sourceId": metrics[index].ConnectionDetails.SourceID, + "destinationId": metrics[index].ConnectionDetails.DestinationID, + "reportedBy": metrics[index].PUDetails.PU, + "sourceCategory": metrics[index].ConnectionDetails.SourceCategory, + "statusCode": strconv.Itoa(metrics[index].StatusDetail.StatusCode), + "terminal": strconv.FormatBool(metrics[index].PUDetails.TerminalPU), + "destinationType": es.configSubscriber.GetDestDetail(metrics[index].ConnectionDetails.DestinationID).destType, + "status": metrics[index].StatusDetail.Status, } + es.stats.NewTaggedStat(EventsProcessedMetricName, stats.CountType, tags).Count(int(metrics[index].StatusDetail.Count)) } } diff --git a/enterprise/reporting/event_stats_test.go b/enterprise/reporting/event_stats_test.go index b13e657a54..175d5688bd 100644 --- a/enterprise/reporting/event_stats_test.go +++ b/enterprise/reporting/event_stats_test.go @@ -44,7 +44,7 @@ func TestEventStatsReporter(t *testing.T) { DestinationDefinition: backendconfig.DestinationDefinitionT{ Name: "test-destination-name", }, - }, // Added a comma here + }, }, }, }, @@ -113,7 +113,7 @@ func TestEventStatsReporter(t *testing.T) { TerminalPU: true, }, StatusDetail: &types.StatusDetail{ - Count: 50, + Count: 150, Status: jobsdb.Migrated.State, StatusCode: 500, }, @@ -159,7 +159,7 @@ func TestEventStatsReporter(t *testing.T) { "terminal": "true", "status": jobsdb.Aborted.State, }).LastValue(), float64(50)) - require.Empty(t, statsStore.Get(EventsProcessedMetricName, map[string]string{ + require.Equal(t, statsStore.Get(EventsProcessedMetricName, map[string]string{ "workspaceId": workspaceID, "sourceId": sourceID, "destinationId": destinationID, @@ -169,7 +169,7 @@ func TestEventStatsReporter(t *testing.T) { "destinationType": "test-destination-name", "terminal": "true", "status": jobsdb.Migrated.State, - })) + }).LastValue(), float64(150)) require.Equal(t, statsStore.Get(EventsProcessedMetricName, map[string]string{ "workspaceId": workspaceID, "sourceId": sourceID,