From d3d0f32313c4b5d2eff3f83c20a74837ba3e266b Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Thu, 12 Sep 2024 17:30:36 +0530 Subject: [PATCH 1/4] fix: gateway internal batch endpoint stats --- gateway/gateway_test.go | 15 +++++++++-- gateway/handle.go | 59 +++++++++++++++++------------------------ 2 files changed, 38 insertions(+), 36 deletions(-) diff --git a/gateway/gateway_test.go b/gateway/gateway_test.go index dc4cce0cd3..cd74569c90 100644 --- a/gateway/gateway_test.go +++ b/gateway/gateway_test.go @@ -40,6 +40,7 @@ import ( "github.com/rudderlabs/rudder-server/app" backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/enterprise/suppress-user/model" + gwstats "github.com/rudderlabs/rudder-server/gateway/internal/stats" gwtypes "github.com/rudderlabs/rudder-server/gateway/internal/types" "github.com/rudderlabs/rudder-server/gateway/response" webhookModel "github.com/rudderlabs/rudder-server/gateway/webhook/model" @@ -1955,8 +1956,13 @@ var _ = Describe("Gateway", func() { done: make(chan<- string), requestPayload: payload, } - jobForm, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload) + jobForm, stat, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload) Expect(err).To(BeNil()) + Expect(stat).To(Equal(gwstats.SourceStat{ + SourceID: "sourceID", + WorkspaceID: "workspaceID", + ReqType: "batch", + })) var job struct { Batch []struct { @@ -1995,8 +2001,13 @@ var _ = Describe("Gateway", func() { done: make(chan<- string), requestPayload: payload, } - jobForm, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload) + jobForm, stat, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload) Expect(err).To(BeNil()) + Expect(stat).To(Equal(gwstats.SourceStat{ + SourceID: "sourceID", + WorkspaceID: "workspaceID", + ReqType: "batch", + })) var job struct { Batch []struct { diff --git a/gateway/handle.go b/gateway/handle.go index 1495684b7d..617e629a34 100644 --- a/gateway/handle.go +++ b/gateway/handle.go @@ -652,38 +652,31 @@ func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc { status int errorMessage string responseBody string + stat = gwstats.SourceStat{ + ReqType: reqType, + } ) // TODO: add tracing gw.logger.LogRequest(r) body, err = gw.getPayloadFromRequest(r) if err != nil { - stat := gwstats.SourceStat{ - ReqType: reqType, - } stat.RequestFailed("requestBodyReadFailed") - stat.Report(gw.stats) goto requestError } - jobs, err = gw.extractJobsFromInternalBatchPayload(reqType, body) + jobs, stat, err = gw.extractJobsFromInternalBatchPayload(reqType, body) + stat.ReqType = reqType if err != nil { + stat.RequestFailed(err.Error()) goto requestError } if len(jobs) > 0 { if err = gw.storeJobs(ctx, jobs); err != nil { - gw.stats.NewTaggedStat( - "gateway.write_key_failed_events", - stats.CountType, - gw.newReqTypeStatsTagsWithReason(reqType, "storeFailed"), - ).Count(len(jobs)) + stat.RequestEventsFailed(len(jobs), "storeFailed") goto requestError } - gw.stats.NewTaggedStat( - "gateway.write_key_successful_events", - stats.CountType, - gw.newReqTypeStatsTagsWithReason(reqType, ""), - ).Count(len(jobs)) + stat.RequestEventsSucceeded(len(jobs)) // Sending events to config backend for _, job := range jobs { @@ -698,11 +691,6 @@ func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc { status = http.StatusOK responseBody = response.GetStatus(response.Ok) - gw.stats.NewTaggedStat( - "gateway.write_key_successful_requests", - stats.CountType, - gw.newReqTypeStatsTagsWithReason(reqType, ""), - ).Increment() gw.logger.Debugn("response", logger.NewStringField("ip", kithttputil.GetRequestIP(r)), logger.NewStringField("path", r.URL.Path), @@ -710,17 +698,14 @@ func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc { logger.NewStringField("body", responseBody), ) _, _ = w.Write([]byte(responseBody)) + stat.Report(gw.stats) return requestError: + stat.Report(gw.stats) errorMessage = err.Error() status = response.GetErrorStatusCode(errorMessage) responseBody = response.GetStatus(errorMessage) - gw.stats.NewTaggedStat( - "gateway.write_key_failed_requests", - stats.CountType, - gw.newReqTypeStatsTagsWithReason(reqType, errorMessage), - ).Increment() gw.logger.Infon("response", logger.NewStringField("ip", kithttputil.GetRequestIP(r)), logger.NewStringField("path", r.URL.Path), @@ -738,7 +723,9 @@ func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc { } } -func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byte) ([]*jobsdb.JobT, error) { +func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byte) ( + []*jobsdb.JobT, gwstats.SourceStat, error, +) { type params struct { MessageID string `json:"message_id"` SourceID string `json:"source_id"` @@ -760,16 +747,17 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt messages []stream.Message isUserSuppressed = gw.memoizedIsUserSuppressed() jobs []*jobsdb.JobT + stat = gwstats.SourceStat{ReqType: reqType} ) err := jsonfast.Unmarshal(body, &messages) if err != nil { - return nil, errors.New(response.InvalidJSON) + return nil, stat, errors.New(response.InvalidJSON) } gw.requestSizeStat.Observe(float64(len(body))) if len(messages) == 0 { - return nil, errors.New(response.NotRudderEvent) + return nil, stat, errors.New(response.NotRudderEvent) } jobs = make([]*jobsdb.JobT, 0, len(messages)) @@ -778,7 +766,7 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt err := gw.streamMsgValidator(&msg) if err != nil { gw.logger.Errorn("invalid message in request", logger.NewErrorField(err)) - return nil, errors.New(response.InvalidStreamMessage) + return nil, stat, errors.New(response.InvalidStreamMessage) } if isUserSuppressed(msg.Properties.WorkspaceID, msg.Properties.UserID, msg.Properties.SourceID) { sourceConfig := gw.getSourceConfigFromSourceID(msg.Properties.SourceID) @@ -817,6 +805,9 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt logger.NewStringField("messageId", msg.Properties.MessageID), obskit.SourceID(msg.Properties.SourceID)) } + stat.SourceID = msg.Properties.SourceID + stat.WorkspaceID = msg.Properties.WorkspaceID + stat.WriteKey = writeKey marshalledParams, err := json.Marshal(jobsDBParams) if err != nil { @@ -831,11 +822,11 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt msg.Payload, err = fillReceivedAt(msg.Payload, msg.Properties.ReceivedAt) if err != nil { - return nil, fmt.Errorf("filling receivedAt: %w", err) + return nil, stat, fmt.Errorf("filling receivedAt: %w", err) } msg.Payload, err = fillRequestIP(msg.Payload, msg.Properties.RequestIP) if err != nil { - return nil, fmt.Errorf("filling request_ip: %w", err) + return nil, stat, fmt.Errorf("filling request_ip: %w", err) } eventBatch := singularEventBatch{ @@ -847,7 +838,7 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt payload, err := json.Marshal(eventBatch) if err != nil { - return nil, fmt.Errorf("marshalling event batch: %w", err) + return nil, stat, fmt.Errorf("marshalling event batch: %w", err) } jobUUID := uuid.New() jobs = append(jobs, &jobsdb.JobT{ @@ -861,10 +852,10 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt }) } if len(jobs) == 0 { // events suppressed - but return success - return nil, nil + return nil, stat, nil } - return jobs, nil + return jobs, stat, nil } func fillReceivedAt(event []byte, receivedAt time.Time) ([]byte, error) { From 3f661f9cc73fa91d07c118a2fef0836ef151b574 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Fri, 13 Sep 2024 16:30:43 +0530 Subject: [PATCH 2/4] chore: tests added for stats --- gateway/gateway_test.go | 225 ++++++++++++++++++++++++++++++++++++++++ gateway/handle.go | 23 ++-- 2 files changed, 237 insertions(+), 11 deletions(-) diff --git a/gateway/gateway_test.go b/gateway/gateway_test.go index dd726c9b8a..407cb35591 100644 --- a/gateway/gateway_test.go +++ b/gateway/gateway_test.go @@ -1838,6 +1838,96 @@ var _ = Describe("Gateway", func() { }, }, })) + Expect(statStore.GetByName("gateway.write_key_events")).To(Equal([]memstats.Metric{ + { + Name: "gateway.write_key_events", + Tags: map[string]string{ + "source": "", + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + }, + Value: 1, + }, + })) + Expect(statStore.GetByName("gateway.write_key_successful_events")).To(Equal([]memstats.Metric{ + { + Name: "gateway.write_key_successful_events", + Tags: map[string]string{ + "source": "", + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + }, + Value: 1, + }, + })) + Expect(statStore.GetByName("gateway.write_key_requests")).To(Equal([]memstats.Metric{ + { + Name: "gateway.write_key_requests", + Tags: map[string]string{ + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + }, + Value: 1, + }, + })) + Expect(statStore.GetByName("gateway.write_key_successful_requests")).To(Equal([]memstats.Metric{ + { + Name: "gateway.write_key_successful_requests", + Tags: map[string]string{ + "source": "", + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + }, + Value: 1, + }, + })) + Expect(statStore.GetByName("gateway.write_key_failed_requests")).To(Equal([]memstats.Metric{ + { + Name: "gateway.write_key_failed_requests", + Tags: map[string]string{ + "source": "", + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + }, + Value: 0, + }, + })) + Expect(statStore.GetByName("gateway.write_key_failed_events")).To(Equal([]memstats.Metric{ + { + Name: "gateway.write_key_failed_events", + Tags: map[string]string{ + "source": "", + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + }, + Value: 0, + }, + })) }) It("Successful request, without debugger", func() { @@ -1860,6 +1950,23 @@ var _ = Describe("Gateway", func() { defer httputil.CloseResponse(resp) Expect(err).To(BeNil()) Expect(string(respData)).Should(ContainSubstring(response.NotRudderEvent)) + failedRequestStat := statStore.GetByName("gateway.write_key_failed_requests") + Expect(failedRequestStat[len(failedRequestStat)-1]).To(Equal( + memstats.Metric{ + Name: "gateway.write_key_failed_requests", + Tags: map[string]string{ + "writeKey": "", + "reqType": "internalBatch", + "reason": response.NotRudderEvent, + "workspaceId": "", + "sourceID": "", + "sourceType": "", + "sdkVersion": "", + "source": "", + }, + Value: 1, + }, + )) }) It("request failed unmarshall error", func() { @@ -1872,6 +1979,25 @@ var _ = Describe("Gateway", func() { defer httputil.CloseResponse(resp) Expect(err).To(BeNil()) Expect(string(respData)).Should(ContainSubstring(response.InvalidJSON)) + failedRequestStat := statStore.GetByName("gateway.write_key_failed_requests") + Expect(failedRequestStat[len(failedRequestStat)-1]).To(Equal( + memstats.Metric{ + Name: "gateway.write_key_failed_requests", + Tags: map[string]string{ + "writeKey": "", + "reqType": "internalBatch", + "reason": response.InvalidJSON, + "workspaceId": "", + "sourceID": "", + "sourceType": "", + "sdkVersion": "", + "source": "", + }, + Value: 1, + }, + )) + failedEventStat := statStore.GetByName("gateway.write_key_failed_events") + Expect(failedEventStat).To(HaveLen(0)) }) It("request failed message validation error", func() { @@ -1884,6 +2010,23 @@ var _ = Describe("Gateway", func() { defer httputil.CloseResponse(resp) Expect(err).To(BeNil()) Expect(string(respData)).Should(ContainSubstring(response.InvalidStreamMessage)) + failedRequestStat := statStore.GetByName("gateway.write_key_failed_requests") + Expect(failedRequestStat[len(failedRequestStat)-1]).To(Equal( + memstats.Metric{ + Name: "gateway.write_key_failed_requests", + Tags: map[string]string{ + "writeKey": "", + "reqType": "internalBatch", + "reason": response.InvalidStreamMessage, + "workspaceId": "", + "sourceID": "", + "sourceType": "", + "sdkVersion": "", + "source": "", + }, + Value: 1, + }, + )) }) It("request success - suppressed user", func() { @@ -1893,6 +2036,22 @@ var _ = Describe("Gateway", func() { resp, err := client.Do(req) Expect(err).To(BeNil()) Expect(http.StatusOK, resp.StatusCode) + successfulReqStat := statStore.GetByName("gateway.write_key_successful_requests") + Expect(successfulReqStat[len(successfulReqStat)-1]).To(Equal( + memstats.Metric{ + Name: "gateway.write_key_successful_requests", + Tags: map[string]string{ + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + }, + Value: 1, + }, + )) }) It("request success - multiple messages", func() { @@ -1904,6 +2063,38 @@ var _ = Describe("Gateway", func() { resp, err := client.Do(req) Expect(err).To(BeNil()) Expect(http.StatusOK, resp.StatusCode) + successfulReqStat := statStore.GetByName("gateway.write_key_successful_requests") + Expect(successfulReqStat[len(successfulReqStat)-1]).To(Equal( + memstats.Metric{ + Name: "gateway.write_key_successful_requests", + Tags: map[string]string{ + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + }, + Value: 1, + }, + )) + successfulEventStat := statStore.GetByName("gateway.write_key_successful_events") + Expect(successfulEventStat[len(successfulEventStat)-1]).To(Equal( + memstats.Metric{ + Name: "gateway.write_key_successful_events", + Tags: map[string]string{ + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + }, + Value: 2, + }, + )) }) It("request failed db error", func() { @@ -1914,6 +2105,40 @@ var _ = Describe("Gateway", func() { resp, err := client.Do(req) Expect(err).To(BeNil()) Expect(http.StatusInternalServerError, resp.StatusCode) + failedReqStat := statStore.GetByName("gateway.write_key_failed_requests") + Expect(failedReqStat[len(failedReqStat)-1]).To(Equal( + memstats.Metric{ + Name: "gateway.write_key_failed_requests", + Tags: map[string]string{ + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + "reason": "storeFailed", + }, + Value: 1, + }, + )) + failedEventStat := statStore.GetByName("gateway.write_key_failed_events") + Expect(failedEventStat[len(failedEventStat)-1]).To(Equal( + memstats.Metric{ + Name: "gateway.write_key_failed_events", + Tags: map[string]string{ + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + "reason": "storeFailed", + }, + Value: 1, + }, + )) }) }) diff --git a/gateway/handle.go b/gateway/handle.go index 3fa611e6b8..c7ff2fb8eb 100644 --- a/gateway/handle.go +++ b/gateway/handle.go @@ -689,6 +689,8 @@ func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc { } gw.sourcehandle.RecordEvent(writeKey, job.EventPayload) } + } else { + stat.RequestEventsSucceeded(0) } status = http.StatusOK @@ -771,6 +773,16 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt gw.logger.Errorn("invalid message in request", logger.NewErrorField(err)) return nil, stat, errors.New(response.InvalidStreamMessage) } + writeKey, ok := gw.getWriteKeyFromSourceID(msg.Properties.SourceID) + if !ok { + // only live-events will not work if writeKey is not found + gw.logger.Errorn("unable to get writeKey for job", + logger.NewStringField("messageId", msg.Properties.MessageID), + obskit.SourceID(msg.Properties.SourceID)) + } + stat.SourceID = msg.Properties.SourceID + stat.WorkspaceID = msg.Properties.WorkspaceID + stat.WriteKey = writeKey if isUserSuppressed(msg.Properties.WorkspaceID, msg.Properties.UserID, msg.Properties.SourceID) { sourceConfig := gw.getSourceConfigFromSourceID(msg.Properties.SourceID) gw.logger.Infon("suppressed event", @@ -802,17 +814,6 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt SourceCategory: msg.Properties.SourceType, } - writeKey, ok := gw.getWriteKeyFromSourceID(msg.Properties.SourceID) - if !ok { - // only live-events will not work if writeKey is not found - gw.logger.Errorn("unable to get writeKey for job", - logger.NewStringField("messageId", msg.Properties.MessageID), - obskit.SourceID(msg.Properties.SourceID)) - } - stat.SourceID = msg.Properties.SourceID - stat.WorkspaceID = msg.Properties.WorkspaceID - stat.WriteKey = writeKey - marshalledParams, err := json.Marshal(jobsDBParams) if err != nil { gw.logger.Errorn("[Gateway] Failed to marshal parameters map", From 920821f47da5065d8e0072854bee2869443ecb0b Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Fri, 13 Sep 2024 16:34:45 +0530 Subject: [PATCH 3/4] fixup! chore: tests added for stats --- gateway/gateway_test.go | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/gateway/gateway_test.go b/gateway/gateway_test.go index 407cb35591..935178f695 100644 --- a/gateway/gateway_test.go +++ b/gateway/gateway_test.go @@ -2095,6 +2095,22 @@ var _ = Describe("Gateway", func() { Value: 2, }, )) + eventsStat := statStore.GetByName("gateway.write_key_events") + Expect(eventsStat[len(eventsStat)-1]).To(Equal( + memstats.Metric{ + Name: "gateway.write_key_events", + Tags: map[string]string{ + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + }, + Value: 2, + }, + )) }) It("request failed db error", func() { @@ -2139,6 +2155,22 @@ var _ = Describe("Gateway", func() { Value: 1, }, )) + eventsStat := statStore.GetByName("gateway.write_key_events") + Expect(eventsStat[len(eventsStat)-1]).To(Equal( + memstats.Metric{ + Name: "gateway.write_key_events", + Tags: map[string]string{ + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + }, + Value: 1, + }, + )) }) }) From 4dd3f9cc440179249c9ea738bc4159a8c3a141d4 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Mon, 16 Sep 2024 13:24:46 +0530 Subject: [PATCH 4/4] chore: stat per individual job --- gateway/gateway_test.go | 322 ++++++++++++++++++---------------------- gateway/handle.go | 115 ++++++++------ 2 files changed, 212 insertions(+), 225 deletions(-) diff --git a/gateway/gateway_test.go b/gateway/gateway_test.go index 935178f695..d837ae8ac7 100644 --- a/gateway/gateway_test.go +++ b/gateway/gateway_test.go @@ -1950,23 +1950,18 @@ var _ = Describe("Gateway", func() { defer httputil.CloseResponse(resp) Expect(err).To(BeNil()) Expect(string(respData)).Should(ContainSubstring(response.NotRudderEvent)) - failedRequestStat := statStore.GetByName("gateway.write_key_failed_requests") - Expect(failedRequestStat[len(failedRequestStat)-1]).To(Equal( - memstats.Metric{ - Name: "gateway.write_key_failed_requests", - Tags: map[string]string{ - "writeKey": "", - "reqType": "internalBatch", - "reason": response.NotRudderEvent, - "workspaceId": "", - "sourceID": "", - "sourceType": "", - "sdkVersion": "", - "source": "", - }, - Value: 1, - }, - )) + failedRequestStat := statStore.Get("gateway.write_key_failed_requests", map[string]string{ + "writeKey": "", + "reqType": "internalBatch", + "reason": response.NotRudderEvent, + "workspaceId": "", + "sourceID": "", + "sourceType": "", + "sdkVersion": "", + "source": "", + }) + Expect(failedRequestStat).To(Not(BeNil())) + Expect(failedRequestStat.Values()).To(Equal([]float64{1})) }) It("request failed unmarshall error", func() { @@ -1979,25 +1974,29 @@ var _ = Describe("Gateway", func() { defer httputil.CloseResponse(resp) Expect(err).To(BeNil()) Expect(string(respData)).Should(ContainSubstring(response.InvalidJSON)) - failedRequestStat := statStore.GetByName("gateway.write_key_failed_requests") - Expect(failedRequestStat[len(failedRequestStat)-1]).To(Equal( - memstats.Metric{ - Name: "gateway.write_key_failed_requests", - Tags: map[string]string{ - "writeKey": "", - "reqType": "internalBatch", - "reason": response.InvalidJSON, - "workspaceId": "", - "sourceID": "", - "sourceType": "", - "sdkVersion": "", - "source": "", - }, - Value: 1, - }, - )) - failedEventStat := statStore.GetByName("gateway.write_key_failed_events") - Expect(failedEventStat).To(HaveLen(0)) + failedRequestStat := statStore.Get("gateway.write_key_failed_requests", map[string]string{ + "writeKey": "", + "reqType": "internalBatch", + "reason": response.InvalidJSON, + "workspaceId": "", + "sourceID": "", + "sourceType": "", + "sdkVersion": "", + "source": "", + }) + Expect(failedRequestStat).To(Not(BeNil())) + Expect(failedRequestStat.Values()).To(Equal([]float64{1})) + failedEventStat := statStore.Get("gateway.write_key_failed_events", map[string]string{ + "writeKey": "", + "reqType": "internalBatch", + "reason": response.InvalidJSON, + "workspaceId": "", + "sourceID": "", + "sourceType": "", + "sdkVersion": "", + "source": "", + }) + Expect(failedEventStat).To(BeNil()) }) It("request failed message validation error", func() { @@ -2010,23 +2009,18 @@ var _ = Describe("Gateway", func() { defer httputil.CloseResponse(resp) Expect(err).To(BeNil()) Expect(string(respData)).Should(ContainSubstring(response.InvalidStreamMessage)) - failedRequestStat := statStore.GetByName("gateway.write_key_failed_requests") - Expect(failedRequestStat[len(failedRequestStat)-1]).To(Equal( - memstats.Metric{ - Name: "gateway.write_key_failed_requests", - Tags: map[string]string{ - "writeKey": "", - "reqType": "internalBatch", - "reason": response.InvalidStreamMessage, - "workspaceId": "", - "sourceID": "", - "sourceType": "", - "sdkVersion": "", - "source": "", - }, - Value: 1, - }, - )) + failedRequestStat := statStore.Get("gateway.write_key_failed_requests", map[string]string{ + "writeKey": "", + "reqType": "internalBatch", + "reason": response.InvalidStreamMessage, + "workspaceId": "", + "sourceID": "", + "sourceType": "", + "sdkVersion": "", + "source": "", + }) + Expect(failedRequestStat).To(Not(BeNil())) + Expect(failedRequestStat.Values()).To(Equal([]float64{1})) }) It("request success - suppressed user", func() { @@ -2036,22 +2030,17 @@ var _ = Describe("Gateway", func() { resp, err := client.Do(req) Expect(err).To(BeNil()) Expect(http.StatusOK, resp.StatusCode) - successfulReqStat := statStore.GetByName("gateway.write_key_successful_requests") - Expect(successfulReqStat[len(successfulReqStat)-1]).To(Equal( - memstats.Metric{ - Name: "gateway.write_key_successful_requests", - Tags: map[string]string{ - "writeKey": WriteKeyEnabled, - "reqType": "internalBatch", - "workspaceId": WorkspaceID, - "sourceID": SourceIDEnabled, - "sourceType": "", - "sdkVersion": "", - "source": "", - }, - Value: 1, - }, - )) + successfulReqStat := statStore.Get("gateway.write_key_successful_requests", map[string]string{ + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + }) + Expect(successfulReqStat).To(Not(BeNil())) + Expect(successfulReqStat.Values()).To(Equal([]float64{1})) }) It("request success - multiple messages", func() { @@ -2063,54 +2052,39 @@ var _ = Describe("Gateway", func() { resp, err := client.Do(req) Expect(err).To(BeNil()) Expect(http.StatusOK, resp.StatusCode) - successfulReqStat := statStore.GetByName("gateway.write_key_successful_requests") - Expect(successfulReqStat[len(successfulReqStat)-1]).To(Equal( - memstats.Metric{ - Name: "gateway.write_key_successful_requests", - Tags: map[string]string{ - "writeKey": WriteKeyEnabled, - "reqType": "internalBatch", - "workspaceId": WorkspaceID, - "sourceID": SourceIDEnabled, - "sourceType": "", - "sdkVersion": "", - "source": "", - }, - Value: 1, - }, - )) - successfulEventStat := statStore.GetByName("gateway.write_key_successful_events") - Expect(successfulEventStat[len(successfulEventStat)-1]).To(Equal( - memstats.Metric{ - Name: "gateway.write_key_successful_events", - Tags: map[string]string{ - "writeKey": WriteKeyEnabled, - "reqType": "internalBatch", - "workspaceId": WorkspaceID, - "sourceID": SourceIDEnabled, - "sourceType": "", - "sdkVersion": "", - "source": "", - }, - Value: 2, - }, - )) - eventsStat := statStore.GetByName("gateway.write_key_events") - Expect(eventsStat[len(eventsStat)-1]).To(Equal( - memstats.Metric{ - Name: "gateway.write_key_events", - Tags: map[string]string{ - "writeKey": WriteKeyEnabled, - "reqType": "internalBatch", - "workspaceId": WorkspaceID, - "sourceID": SourceIDEnabled, - "sourceType": "", - "sdkVersion": "", - "source": "", - }, - Value: 2, - }, - )) + successfulReqStat := statStore.Get("gateway.write_key_successful_requests", map[string]string{ + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + }) + Expect(successfulReqStat).To(Not(BeNil())) + Expect(successfulReqStat.LastValue()).To(Equal(float64(3))) + successfulEventStat := statStore.Get("gateway.write_key_successful_events", map[string]string{ + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + }) + Expect(successfulEventStat).To(Not(BeNil())) + Expect(successfulEventStat.LastValue()).To(Equal(float64(3))) + eventsStat := statStore.Get("gateway.write_key_events", map[string]string{ + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + }) + Expect(eventsStat).To(Not(BeNil())) + Expect(eventsStat.Values()).To(Equal([]float64{1, 2, 3})) }) It("request failed db error", func() { @@ -2121,56 +2095,41 @@ var _ = Describe("Gateway", func() { resp, err := client.Do(req) Expect(err).To(BeNil()) Expect(http.StatusInternalServerError, resp.StatusCode) - failedReqStat := statStore.GetByName("gateway.write_key_failed_requests") - Expect(failedReqStat[len(failedReqStat)-1]).To(Equal( - memstats.Metric{ - Name: "gateway.write_key_failed_requests", - Tags: map[string]string{ - "writeKey": WriteKeyEnabled, - "reqType": "internalBatch", - "workspaceId": WorkspaceID, - "sourceID": SourceIDEnabled, - "sourceType": "", - "sdkVersion": "", - "source": "", - "reason": "storeFailed", - }, - Value: 1, - }, - )) - failedEventStat := statStore.GetByName("gateway.write_key_failed_events") - Expect(failedEventStat[len(failedEventStat)-1]).To(Equal( - memstats.Metric{ - Name: "gateway.write_key_failed_events", - Tags: map[string]string{ - "writeKey": WriteKeyEnabled, - "reqType": "internalBatch", - "workspaceId": WorkspaceID, - "sourceID": SourceIDEnabled, - "sourceType": "", - "sdkVersion": "", - "source": "", - "reason": "storeFailed", - }, - Value: 1, - }, - )) - eventsStat := statStore.GetByName("gateway.write_key_events") - Expect(eventsStat[len(eventsStat)-1]).To(Equal( - memstats.Metric{ - Name: "gateway.write_key_events", - Tags: map[string]string{ - "writeKey": WriteKeyEnabled, - "reqType": "internalBatch", - "workspaceId": WorkspaceID, - "sourceID": SourceIDEnabled, - "sourceType": "", - "sdkVersion": "", - "source": "", - }, - Value: 1, - }, - )) + failedReqStat := statStore.Get("gateway.write_key_failed_requests", map[string]string{ + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + "reason": "storeFailed", + }) + Expect(failedReqStat).To(Not(BeNil())) + Expect(failedReqStat.Values()).To(Equal([]float64{1})) + failedEventStat := statStore.Get("gateway.write_key_failed_events", map[string]string{ + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + "reason": "storeFailed", + }) + Expect(failedEventStat).To(Not(BeNil())) + Expect(failedEventStat.Values()).To(Equal([]float64{1})) + eventsStat := statStore.Get("gateway.write_key_events", map[string]string{ + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + }) + Expect(eventsStat).To(Not(BeNil())) + Expect(eventsStat.Values()).To(Equal([]float64{1, 2, 3, 4})) }) }) @@ -2213,13 +2172,16 @@ var _ = Describe("Gateway", func() { done: make(chan<- string), requestPayload: payload, } - jobForm, stat, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload) - Expect(err).To(BeNil()) - Expect(stat).To(Equal(gwstats.SourceStat{ - SourceID: "sourceID", - WorkspaceID: "workspaceID", - ReqType: "batch", - })) + jobsWithStats, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload) + Expect(err).To(BeNil()) + Expect(jobsWithStats).To(HaveLen(1)) + Expect(jobsWithStats[0].stat).To(Equal( + gwstats.SourceStat{ + SourceID: "sourceID", + WorkspaceID: "workspaceID", + ReqType: "batch", + }, + )) var job struct { Batch []struct { @@ -2227,8 +2189,8 @@ var _ = Describe("Gateway", func() { RequestIP string `json:"request_ip"` } `json:"batch"` } - Expect(jobForm).To(HaveLen(1)) - err = json.Unmarshal(jobForm[0].EventPayload, &job) + jobForm := jobsWithStats[0].job + err = json.Unmarshal(jobForm.EventPayload, &job) Expect(err).To(BeNil()) Expect(job.Batch).To(HaveLen(1)) Expect(job.Batch[0].ReceivedAt).To(ContainSubstring("dummyReceivedAtFromPayload")) @@ -2258,9 +2220,10 @@ var _ = Describe("Gateway", func() { done: make(chan<- string), requestPayload: payload, } - jobForm, stat, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload) + jobsWithStats, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload) Expect(err).To(BeNil()) - Expect(stat).To(Equal(gwstats.SourceStat{ + Expect(jobsWithStats).To(HaveLen(1)) + Expect(jobsWithStats[0].stat).To(Equal(gwstats.SourceStat{ SourceID: "sourceID", WorkspaceID: "workspaceID", ReqType: "batch", @@ -2272,8 +2235,7 @@ var _ = Describe("Gateway", func() { RequestIP string `json:"request_ip"` } `json:"batch"` } - Expect(jobForm).To(HaveLen(1)) - err = json.Unmarshal(jobForm[0].EventPayload, &job) + err = json.Unmarshal(jobsWithStats[0].job.EventPayload, &job) Expect(err).To(BeNil()) Expect(job.Batch).To(HaveLen(1)) Expect(job.Batch[0].ReceivedAt).To(ContainSubstring("2024-01-01T01:01:01.000Z")) diff --git a/gateway/handle.go b/gateway/handle.go index c7ff2fb8eb..be2e04d712 100644 --- a/gateway/handle.go +++ b/gateway/handle.go @@ -646,17 +646,15 @@ func (gw *Handle) addToWebRequestQ(_ *http.ResponseWriter, req *http.Request, do func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { var ( - ctx = r.Context() - reqType = ctx.Value(gwtypes.CtxParamCallType).(string) - jobs []*jobsdb.JobT - body []byte - err error - status int - errorMessage string - responseBody string - stat = gwstats.SourceStat{ - ReqType: reqType, - } + ctx = r.Context() + reqType = ctx.Value(gwtypes.CtxParamCallType).(string) + jobsWithStats []jobWithStat + body []byte + err error + status int + errorMessage string + responseBody string + stat = gwstats.SourceStat{ReqType: reqType} ) // TODO: add tracing @@ -664,33 +662,38 @@ func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc { body, err = gw.getPayloadFromRequest(r) if err != nil { stat.RequestFailed("requestBodyReadFailed") + stat.Report(gw.stats) goto requestError } - jobs, stat, err = gw.extractJobsFromInternalBatchPayload(reqType, body) - stat.ReqType = reqType + jobsWithStats, err = gw.extractJobsFromInternalBatchPayload(reqType, body) if err != nil { - stat.RequestFailed(err.Error()) goto requestError } - if len(jobs) > 0 { + if len(jobsWithStats) > 0 { + jobs := lo.Map(jobsWithStats, func(jws jobWithStat, _ int) *jobsdb.JobT { + return jws.job + }) if err = gw.storeJobs(ctx, jobs); err != nil { - stat.RequestEventsFailed(len(jobs), "storeFailed") + for _, jws := range jobsWithStats { + jws.stat.RequestEventsFailed(1, "storeFailed") + jws.stat.Report(gw.stats) + } goto requestError } - stat.RequestEventsSucceeded(len(jobs)) - - // Sending events to config backend - for _, job := range jobs { - writeKey := gjson.GetBytes(job.EventPayload, "writeKey").String() - if writeKey == "" { + for _, jws := range jobsWithStats { + jws.stat.RequestEventsSucceeded(1) + jws.stat.Report(gw.stats) + // Sending events to config backend + if jws.stat.WriteKey == "" { gw.logger.Errorn("writeKey not found in event payload") continue } - gw.sourcehandle.RecordEvent(writeKey, job.EventPayload) + gw.sourcehandle.RecordEvent(jws.stat.WriteKey, jws.job.EventPayload) } } else { stat.RequestEventsSucceeded(0) + stat.Report(gw.stats) } status = http.StatusOK @@ -702,11 +705,9 @@ func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc { logger.NewStringField("body", responseBody), ) _, _ = w.Write([]byte(responseBody)) - stat.Report(gw.stats) return requestError: - stat.Report(gw.stats) errorMessage = err.Error() status = response.GetErrorStatusCode(errorMessage) responseBody = response.GetStatus(errorMessage) @@ -727,8 +728,13 @@ func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc { } } +type jobWithStat struct { + job *jobsdb.JobT + stat gwstats.SourceStat +} + func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byte) ( - []*jobsdb.JobT, gwstats.SourceStat, error, + []jobWithStat, error, ) { type params struct { MessageID string `json:"message_id"` @@ -751,27 +757,34 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt var ( messages []stream.Message isUserSuppressed = gw.memoizedIsUserSuppressed() - jobs []*jobsdb.JobT + res []jobWithStat stat = gwstats.SourceStat{ReqType: reqType} ) err := jsonfast.Unmarshal(body, &messages) if err != nil { - return nil, stat, errors.New(response.InvalidJSON) + stat.RequestFailed(response.InvalidJSON) + stat.Report(gw.stats) + return nil, errors.New(response.InvalidJSON) } gw.requestSizeStat.Observe(float64(len(body))) if len(messages) == 0 { - return nil, stat, errors.New(response.NotRudderEvent) + stat.RequestFailed(response.NotRudderEvent) + stat.Report(gw.stats) + return nil, errors.New(response.NotRudderEvent) } - jobs = make([]*jobsdb.JobT, 0, len(messages)) + res = make([]jobWithStat, 0, len(messages)) for _, msg := range messages { + stat := gwstats.SourceStat{ReqType: reqType} err := gw.streamMsgValidator(&msg) if err != nil { gw.logger.Errorn("invalid message in request", logger.NewErrorField(err)) - return nil, stat, errors.New(response.InvalidStreamMessage) + stat.RequestEventsFailed(1, response.InvalidStreamMessage) + stat.Report(gw.stats) + return nil, errors.New(response.InvalidStreamMessage) } writeKey, ok := gw.getWriteKeyFromSourceID(msg.Properties.SourceID) if !ok { @@ -827,11 +840,17 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt msg.Payload, err = fillReceivedAt(msg.Payload, msg.Properties.ReceivedAt) if err != nil { - return nil, stat, fmt.Errorf("filling receivedAt: %w", err) + err = fmt.Errorf("filling receivedAt: %w", err) + stat.RequestEventsFailed(1, err.Error()) + stat.Report(gw.stats) + return nil, fmt.Errorf("filling receivedAt: %w", err) } msg.Payload, err = fillRequestIP(msg.Payload, msg.Properties.RequestIP) if err != nil { - return nil, stat, fmt.Errorf("filling request_ip: %w", err) + err = fmt.Errorf("filling request_ip: %w", err) + stat.RequestEventsFailed(1, err.Error()) + stat.Report(gw.stats) + return nil, fmt.Errorf("filling request_ip: %w", err) } eventBatch := singularEventBatch{ @@ -843,24 +862,30 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt payload, err := json.Marshal(eventBatch) if err != nil { - return nil, stat, fmt.Errorf("marshalling event batch: %w", err) + err = fmt.Errorf("marshalling event batch: %w", err) + stat.RequestEventsFailed(1, err.Error()) + stat.Report(gw.stats) + return nil, fmt.Errorf("marshalling event batch: %w", err) } jobUUID := uuid.New() - jobs = append(jobs, &jobsdb.JobT{ - UUID: jobUUID, - UserID: msg.Properties.RoutingKey, - Parameters: marshalledParams, - CustomVal: customVal, - EventPayload: payload, - EventCount: len(eventBatch.Batch), - WorkspaceId: msg.Properties.WorkspaceID, + res = append(res, jobWithStat{ + stat: stat, + job: &jobsdb.JobT{ + UUID: jobUUID, + UserID: msg.Properties.RoutingKey, + Parameters: marshalledParams, + CustomVal: customVal, + EventPayload: payload, + EventCount: len(eventBatch.Batch), + WorkspaceId: msg.Properties.WorkspaceID, + }, }) } - if len(jobs) == 0 { // events suppressed - but return success - return nil, stat, nil + if len(res) == 0 { // events suppressed - but return success + return nil, nil } - return jobs, stat, nil + return res, nil } func fillReceivedAt(event []byte, receivedAt time.Time) ([]byte, error) {