Skip to content

Commit

Permalink
chore: skip rETL ETL from rate limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Nov 2, 2023
1 parent c45c46d commit fdda3a5
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 15 deletions.
3 changes: 2 additions & 1 deletion gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,10 +884,11 @@ var _ = Describe("Gateway", func() {
})

It("should reject messages if rate limit is reached for workspace", func() {
conf.Set("Gateway.allowReqsWithoutUserIDAndAnonymousID", true)
c.mockRateLimiter.EXPECT().CheckLimitReached(gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil).Times(1)
expectHandlerResponse(
gateway.webAliasHandler(),
authorizedRequest(WriteKeyEnabled, bytes.NewBufferString("{}")),
authorizedRequest(WriteKeyEnabled, bytes.NewBufferString(`{"data": "valid-json"}`)),
http.StatusTooManyRequests,
response.TooManyRequests+"\n",
"alias",
Expand Down
29 changes: 15 additions & 14 deletions gateway/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,9 @@ func (gw *Handle) getJobDataFromRequest(req *webRequestT) (jobData *jobFromReq,
userIDHeader = req.userIDHeader
ipAddr = req.ipAddr
body = req.requestPayload

// values retrieved from first event in batch
sourcesJobRunID, sourcesTaskRunID = req.authContext.SourceJobRunID, req.authContext.SourceTaskRunID
)

fillMessageID := func(event map[string]interface{}) {
Expand Down Expand Up @@ -295,18 +298,6 @@ func (gw *Handle) getJobDataFromRequest(req *webRequestT) (jobData *jobFromReq,
eventsBatch := gjson.GetBytes(body, "batch").Array()
jobData.numEvents = len(eventsBatch)

if gw.conf.enableRateLimit.Load() {
// In case of "batch" requests, if rate-limiter returns true for LimitReached, just drop the event batch and continue.
ok, errCheck := gw.rateLimiter.CheckLimitReached(context.TODO(), workspaceId, int64(len(eventsBatch)))
if errCheck != nil {
gw.stats.NewTaggedStat("gateway.rate_limiter_error", stats.CountType, stats.Tags{"workspaceId": workspaceId}).Increment()
gw.logger.Errorf("Rate limiter error: %v Allowing the request", errCheck)
}
if ok {
return jobData, errRequestDropped
}
}

type jobObject struct {
userID string
events []map[string]interface{}
Expand All @@ -317,8 +308,6 @@ func (gw *Handle) getJobDataFromRequest(req *webRequestT) (jobData *jobFromReq,
out []jobObject

marshalledParams []byte
// values retrieved from first event in batch
sourcesJobRunID, sourcesTaskRunID = req.authContext.SourceJobRunID, req.authContext.SourceTaskRunID

// facts about the batch populated as we iterate over events
containsAudienceList, suppressed bool
Expand Down Expand Up @@ -408,6 +397,18 @@ func (gw *Handle) getJobDataFromRequest(req *webRequestT) (jobData *jobFromReq,
})
}

if gw.conf.enableRateLimit.Load() && sourcesJobRunID == "" && sourcesTaskRunID == "" {
// In case of "batch" requests, if rate-limiter returns true for LimitReached, just drop the event batch and continue.
ok, errCheck := gw.rateLimiter.CheckLimitReached(context.TODO(), workspaceId, int64(len(eventsBatch)))
if errCheck != nil {
gw.stats.NewTaggedStat("gateway.rate_limiter_error", stats.CountType, stats.Tags{"workspaceId": workspaceId}).Increment()
gw.logger.Errorf("Rate limiter error: %v Allowing the request", errCheck)
}
if ok {
return jobData, errRequestDropped
}
}

if len(out) == 0 && suppressed {
err = errRequestSuppressed
return
Expand Down

0 comments on commit fdda3a5

Please sign in to comment.