diff --git a/gateway/gateway_test.go b/gateway/gateway_test.go index 09a1baa012..d837ae8ac7 100644 --- a/gateway/gateway_test.go +++ b/gateway/gateway_test.go @@ -40,6 +40,7 @@ import ( "github.com/rudderlabs/rudder-server/app" backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/enterprise/suppress-user/model" + gwstats "github.com/rudderlabs/rudder-server/gateway/internal/stats" gwtypes "github.com/rudderlabs/rudder-server/gateway/internal/types" "github.com/rudderlabs/rudder-server/gateway/response" webhookModel "github.com/rudderlabs/rudder-server/gateway/webhook/model" @@ -1837,6 +1838,96 @@ var _ = Describe("Gateway", func() { }, }, })) + Expect(statStore.GetByName("gateway.write_key_events")).To(Equal([]memstats.Metric{ + { + Name: "gateway.write_key_events", + Tags: map[string]string{ + "source": "", + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + }, + Value: 1, + }, + })) + Expect(statStore.GetByName("gateway.write_key_successful_events")).To(Equal([]memstats.Metric{ + { + Name: "gateway.write_key_successful_events", + Tags: map[string]string{ + "source": "", + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + }, + Value: 1, + }, + })) + Expect(statStore.GetByName("gateway.write_key_requests")).To(Equal([]memstats.Metric{ + { + Name: "gateway.write_key_requests", + Tags: map[string]string{ + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + }, + Value: 1, + }, + })) + Expect(statStore.GetByName("gateway.write_key_successful_requests")).To(Equal([]memstats.Metric{ + { + Name: "gateway.write_key_successful_requests", + Tags: map[string]string{ + "source": "", + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + }, + Value: 1, + }, + })) + Expect(statStore.GetByName("gateway.write_key_failed_requests")).To(Equal([]memstats.Metric{ + { + Name: "gateway.write_key_failed_requests", + Tags: map[string]string{ + "source": "", + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + }, + Value: 0, + }, + })) + Expect(statStore.GetByName("gateway.write_key_failed_events")).To(Equal([]memstats.Metric{ + { + Name: "gateway.write_key_failed_events", + Tags: map[string]string{ + "source": "", + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + }, + Value: 0, + }, + })) }) It("Successful request, without debugger", func() { @@ -1859,6 +1950,18 @@ var _ = Describe("Gateway", func() { defer httputil.CloseResponse(resp) Expect(err).To(BeNil()) Expect(string(respData)).Should(ContainSubstring(response.NotRudderEvent)) + failedRequestStat := statStore.Get("gateway.write_key_failed_requests", map[string]string{ + "writeKey": "", + "reqType": "internalBatch", + "reason": response.NotRudderEvent, + "workspaceId": "", + "sourceID": "", + "sourceType": "", + "sdkVersion": "", + "source": "", + }) + Expect(failedRequestStat).To(Not(BeNil())) + Expect(failedRequestStat.Values()).To(Equal([]float64{1})) }) It("request failed unmarshall error", func() { @@ -1871,6 +1974,29 @@ var _ = Describe("Gateway", func() { defer httputil.CloseResponse(resp) Expect(err).To(BeNil()) Expect(string(respData)).Should(ContainSubstring(response.InvalidJSON)) + failedRequestStat := statStore.Get("gateway.write_key_failed_requests", map[string]string{ + "writeKey": "", + "reqType": "internalBatch", + "reason": response.InvalidJSON, + "workspaceId": "", + "sourceID": "", + "sourceType": "", + "sdkVersion": "", + "source": "", + }) + Expect(failedRequestStat).To(Not(BeNil())) + Expect(failedRequestStat.Values()).To(Equal([]float64{1})) + failedEventStat := statStore.Get("gateway.write_key_failed_events", map[string]string{ + "writeKey": "", + "reqType": "internalBatch", + "reason": response.InvalidJSON, + "workspaceId": "", + "sourceID": "", + "sourceType": "", + "sdkVersion": "", + "source": "", + }) + Expect(failedEventStat).To(BeNil()) }) It("request failed message validation error", func() { @@ -1883,6 +2009,18 @@ var _ = Describe("Gateway", func() { defer httputil.CloseResponse(resp) Expect(err).To(BeNil()) Expect(string(respData)).Should(ContainSubstring(response.InvalidStreamMessage)) + failedRequestStat := statStore.Get("gateway.write_key_failed_requests", map[string]string{ + "writeKey": "", + "reqType": "internalBatch", + "reason": response.InvalidStreamMessage, + "workspaceId": "", + "sourceID": "", + "sourceType": "", + "sdkVersion": "", + "source": "", + }) + Expect(failedRequestStat).To(Not(BeNil())) + Expect(failedRequestStat.Values()).To(Equal([]float64{1})) }) It("request success - suppressed user", func() { @@ -1892,6 +2030,17 @@ var _ = Describe("Gateway", func() { resp, err := client.Do(req) Expect(err).To(BeNil()) Expect(http.StatusOK, resp.StatusCode) + successfulReqStat := statStore.Get("gateway.write_key_successful_requests", map[string]string{ + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + }) + Expect(successfulReqStat).To(Not(BeNil())) + Expect(successfulReqStat.Values()).To(Equal([]float64{1})) }) It("request success - multiple messages", func() { @@ -1903,6 +2052,39 @@ var _ = Describe("Gateway", func() { resp, err := client.Do(req) Expect(err).To(BeNil()) Expect(http.StatusOK, resp.StatusCode) + successfulReqStat := statStore.Get("gateway.write_key_successful_requests", map[string]string{ + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + }) + Expect(successfulReqStat).To(Not(BeNil())) + Expect(successfulReqStat.LastValue()).To(Equal(float64(3))) + successfulEventStat := statStore.Get("gateway.write_key_successful_events", map[string]string{ + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + }) + Expect(successfulEventStat).To(Not(BeNil())) + Expect(successfulEventStat.LastValue()).To(Equal(float64(3))) + eventsStat := statStore.Get("gateway.write_key_events", map[string]string{ + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + }) + Expect(eventsStat).To(Not(BeNil())) + Expect(eventsStat.Values()).To(Equal([]float64{1, 2, 3})) }) It("request failed db error", func() { @@ -1913,6 +2095,41 @@ var _ = Describe("Gateway", func() { resp, err := client.Do(req) Expect(err).To(BeNil()) Expect(http.StatusInternalServerError, resp.StatusCode) + failedReqStat := statStore.Get("gateway.write_key_failed_requests", map[string]string{ + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + "reason": "storeFailed", + }) + Expect(failedReqStat).To(Not(BeNil())) + Expect(failedReqStat.Values()).To(Equal([]float64{1})) + failedEventStat := statStore.Get("gateway.write_key_failed_events", map[string]string{ + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + "reason": "storeFailed", + }) + Expect(failedEventStat).To(Not(BeNil())) + Expect(failedEventStat.Values()).To(Equal([]float64{1})) + eventsStat := statStore.Get("gateway.write_key_events", map[string]string{ + "writeKey": WriteKeyEnabled, + "reqType": "internalBatch", + "workspaceId": WorkspaceID, + "sourceID": SourceIDEnabled, + "sourceType": "", + "sdkVersion": "", + "source": "", + }) + Expect(eventsStat).To(Not(BeNil())) + Expect(eventsStat.Values()).To(Equal([]float64{1, 2, 3, 4})) }) }) @@ -1955,8 +2172,16 @@ var _ = Describe("Gateway", func() { done: make(chan<- string), requestPayload: payload, } - jobForm, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload) - Expect(err).To(BeNil()) + jobsWithStats, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload) + Expect(err).To(BeNil()) + Expect(jobsWithStats).To(HaveLen(1)) + Expect(jobsWithStats[0].stat).To(Equal( + gwstats.SourceStat{ + SourceID: "sourceID", + WorkspaceID: "workspaceID", + ReqType: "batch", + }, + )) var job struct { Batch []struct { @@ -1964,8 +2189,8 @@ var _ = Describe("Gateway", func() { RequestIP string `json:"request_ip"` } `json:"batch"` } - Expect(jobForm).To(HaveLen(1)) - err = json.Unmarshal(jobForm[0].EventPayload, &job) + jobForm := jobsWithStats[0].job + err = json.Unmarshal(jobForm.EventPayload, &job) Expect(err).To(BeNil()) Expect(job.Batch).To(HaveLen(1)) Expect(job.Batch[0].ReceivedAt).To(ContainSubstring("dummyReceivedAtFromPayload")) @@ -1995,8 +2220,14 @@ var _ = Describe("Gateway", func() { done: make(chan<- string), requestPayload: payload, } - jobForm, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload) + jobsWithStats, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload) Expect(err).To(BeNil()) + Expect(jobsWithStats).To(HaveLen(1)) + Expect(jobsWithStats[0].stat).To(Equal(gwstats.SourceStat{ + SourceID: "sourceID", + WorkspaceID: "workspaceID", + ReqType: "batch", + })) var job struct { Batch []struct { @@ -2004,8 +2235,7 @@ var _ = Describe("Gateway", func() { RequestIP string `json:"request_ip"` } `json:"batch"` } - Expect(jobForm).To(HaveLen(1)) - err = json.Unmarshal(jobForm[0].EventPayload, &job) + err = json.Unmarshal(jobsWithStats[0].job.EventPayload, &job) Expect(err).To(BeNil()) Expect(job.Batch).To(HaveLen(1)) Expect(job.Batch[0].ReceivedAt).To(ContainSubstring("2024-01-01T01:01:01.000Z")) diff --git a/gateway/handle.go b/gateway/handle.go index 42c75e3dd0..be2e04d712 100644 --- a/gateway/handle.go +++ b/gateway/handle.go @@ -646,65 +646,58 @@ func (gw *Handle) addToWebRequestQ(_ *http.ResponseWriter, req *http.Request, do func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { var ( - ctx = r.Context() - reqType = ctx.Value(gwtypes.CtxParamCallType).(string) - jobs []*jobsdb.JobT - body []byte - err error - status int - errorMessage string - responseBody string + ctx = r.Context() + reqType = ctx.Value(gwtypes.CtxParamCallType).(string) + jobsWithStats []jobWithStat + body []byte + err error + status int + errorMessage string + responseBody string + stat = gwstats.SourceStat{ReqType: reqType} ) // TODO: add tracing gw.logger.LogRequest(r) body, err = gw.getPayloadFromRequest(r) if err != nil { - stat := gwstats.SourceStat{ - ReqType: reqType, - } stat.RequestFailed("requestBodyReadFailed") stat.Report(gw.stats) goto requestError } - jobs, err = gw.extractJobsFromInternalBatchPayload(reqType, body) + jobsWithStats, err = gw.extractJobsFromInternalBatchPayload(reqType, body) if err != nil { goto requestError } - if len(jobs) > 0 { + if len(jobsWithStats) > 0 { + jobs := lo.Map(jobsWithStats, func(jws jobWithStat, _ int) *jobsdb.JobT { + return jws.job + }) if err = gw.storeJobs(ctx, jobs); err != nil { - gw.stats.NewTaggedStat( - "gateway.write_key_failed_events", - stats.CountType, - gw.newReqTypeStatsTagsWithReason(reqType, "storeFailed"), - ).Count(len(jobs)) + for _, jws := range jobsWithStats { + jws.stat.RequestEventsFailed(1, "storeFailed") + jws.stat.Report(gw.stats) + } goto requestError } - gw.stats.NewTaggedStat( - "gateway.write_key_successful_events", - stats.CountType, - gw.newReqTypeStatsTagsWithReason(reqType, ""), - ).Count(len(jobs)) - - // Sending events to config backend - for _, job := range jobs { - writeKey := gjson.GetBytes(job.EventPayload, "writeKey").String() - if writeKey == "" { + for _, jws := range jobsWithStats { + jws.stat.RequestEventsSucceeded(1) + jws.stat.Report(gw.stats) + // Sending events to config backend + if jws.stat.WriteKey == "" { gw.logger.Errorn("writeKey not found in event payload") continue } - gw.sourcehandle.RecordEvent(writeKey, job.EventPayload) + gw.sourcehandle.RecordEvent(jws.stat.WriteKey, jws.job.EventPayload) } + } else { + stat.RequestEventsSucceeded(0) + stat.Report(gw.stats) } status = http.StatusOK responseBody = response.GetStatus(response.Ok) - gw.stats.NewTaggedStat( - "gateway.write_key_successful_requests", - stats.CountType, - gw.newReqTypeStatsTagsWithReason(reqType, ""), - ).Increment() gw.logger.Debugn("response", logger.NewStringField("ip", kithttputil.GetRequestIP(r)), logger.NewStringField("path", r.URL.Path), @@ -718,11 +711,6 @@ func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc { errorMessage = err.Error() status = response.GetErrorStatusCode(errorMessage) responseBody = response.GetStatus(errorMessage) - gw.stats.NewTaggedStat( - "gateway.write_key_failed_requests", - stats.CountType, - gw.newReqTypeStatsTagsWithReason(reqType, errorMessage), - ).Increment() gw.logger.Infon("response", logger.NewStringField("ip", kithttputil.GetRequestIP(r)), logger.NewStringField("path", r.URL.Path), @@ -740,7 +728,14 @@ func (gw *Handle) internalBatchHandlerFunc() http.HandlerFunc { } } -func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byte) ([]*jobsdb.JobT, error) { +type jobWithStat struct { + job *jobsdb.JobT + stat gwstats.SourceStat +} + +func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byte) ( + []jobWithStat, error, +) { type params struct { MessageID string `json:"message_id"` SourceID string `json:"source_id"` @@ -762,27 +757,45 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt var ( messages []stream.Message isUserSuppressed = gw.memoizedIsUserSuppressed() - jobs []*jobsdb.JobT + res []jobWithStat + stat = gwstats.SourceStat{ReqType: reqType} ) err := jsonfast.Unmarshal(body, &messages) if err != nil { + stat.RequestFailed(response.InvalidJSON) + stat.Report(gw.stats) return nil, errors.New(response.InvalidJSON) } gw.requestSizeStat.Observe(float64(len(body))) if len(messages) == 0 { + stat.RequestFailed(response.NotRudderEvent) + stat.Report(gw.stats) return nil, errors.New(response.NotRudderEvent) } - jobs = make([]*jobsdb.JobT, 0, len(messages)) + res = make([]jobWithStat, 0, len(messages)) for _, msg := range messages { + stat := gwstats.SourceStat{ReqType: reqType} err := gw.streamMsgValidator(&msg) if err != nil { gw.logger.Errorn("invalid message in request", logger.NewErrorField(err)) + stat.RequestEventsFailed(1, response.InvalidStreamMessage) + stat.Report(gw.stats) return nil, errors.New(response.InvalidStreamMessage) } + writeKey, ok := gw.getWriteKeyFromSourceID(msg.Properties.SourceID) + if !ok { + // only live-events will not work if writeKey is not found + gw.logger.Errorn("unable to get writeKey for job", + logger.NewStringField("messageId", msg.Properties.MessageID), + obskit.SourceID(msg.Properties.SourceID)) + } + stat.SourceID = msg.Properties.SourceID + stat.WorkspaceID = msg.Properties.WorkspaceID + stat.WriteKey = writeKey if isUserSuppressed(msg.Properties.WorkspaceID, msg.Properties.UserID, msg.Properties.SourceID) { sourceConfig := gw.getSourceConfigFromSourceID(msg.Properties.SourceID) gw.logger.Infon("suppressed event", @@ -814,14 +827,6 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt SourceCategory: msg.Properties.SourceType, } - writeKey, ok := gw.getWriteKeyFromSourceID(msg.Properties.SourceID) - if !ok { - // only live-events will not work if writeKey is not found - gw.logger.Errorn("unable to get writeKey for job", - logger.NewStringField("messageId", msg.Properties.MessageID), - obskit.SourceID(msg.Properties.SourceID)) - } - marshalledParams, err := json.Marshal(jobsDBParams) if err != nil { gw.logger.Errorn("[Gateway] Failed to marshal parameters map", @@ -835,10 +840,16 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt msg.Payload, err = fillReceivedAt(msg.Payload, msg.Properties.ReceivedAt) if err != nil { + err = fmt.Errorf("filling receivedAt: %w", err) + stat.RequestEventsFailed(1, err.Error()) + stat.Report(gw.stats) return nil, fmt.Errorf("filling receivedAt: %w", err) } msg.Payload, err = fillRequestIP(msg.Payload, msg.Properties.RequestIP) if err != nil { + err = fmt.Errorf("filling request_ip: %w", err) + stat.RequestEventsFailed(1, err.Error()) + stat.Report(gw.stats) return nil, fmt.Errorf("filling request_ip: %w", err) } @@ -851,24 +862,30 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt payload, err := json.Marshal(eventBatch) if err != nil { + err = fmt.Errorf("marshalling event batch: %w", err) + stat.RequestEventsFailed(1, err.Error()) + stat.Report(gw.stats) return nil, fmt.Errorf("marshalling event batch: %w", err) } jobUUID := uuid.New() - jobs = append(jobs, &jobsdb.JobT{ - UUID: jobUUID, - UserID: msg.Properties.RoutingKey, - Parameters: marshalledParams, - CustomVal: customVal, - EventPayload: payload, - EventCount: len(eventBatch.Batch), - WorkspaceId: msg.Properties.WorkspaceID, + res = append(res, jobWithStat{ + stat: stat, + job: &jobsdb.JobT{ + UUID: jobUUID, + UserID: msg.Properties.RoutingKey, + Parameters: marshalledParams, + CustomVal: customVal, + EventPayload: payload, + EventCount: len(eventBatch.Batch), + WorkspaceId: msg.Properties.WorkspaceID, + }, }) } - if len(jobs) == 0 { // events suppressed - but return success + if len(res) == 0 { // events suppressed - but return success return nil, nil } - return jobs, nil + return res, nil } func fillReceivedAt(event []byte, receivedAt time.Time) ([]byte, error) { diff --git a/gateway/handle_observability.go b/gateway/handle_observability.go index e32c517e38..e8f3b821ee 100644 --- a/gateway/handle_observability.go +++ b/gateway/handle_observability.go @@ -34,13 +34,3 @@ func (gw *Handle) newSourceStatTagsWithReason(s *backendconfig.SourceT, reqType, } return tags } - -func (gw *Handle) newReqTypeStatsTagsWithReason(reqType, reason string) stats.Tags { - tags := stats.Tags{ - "req_type": reqType, - } - if reason != "" { - tags["reason"] = reason - } - return tags -}