Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update router reports payload behind a flag, emit stats to observe sizes #5067

Merged
merged 4 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Handle struct {
drainConcurrencyLimit config.ValueLoader[int]
workerInputBufferSize int
saveDestinationResponse bool
reportJobsdbPayload config.ValueLoader[bool]

diagnosisTickerTime time.Duration

Expand Down
1 change: 1 addition & 0 deletions router/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (rt *Handle) Setup(
rt.eventOrderKeyThreshold = config.GetReloadableIntVar(200, 1, "Router."+destType+".eventOrderKeyThreshold", "Router.eventOrderKeyThreshold")
rt.eventOrderDisabledStateDuration = config.GetReloadableDurationVar(20, time.Minute, "Router."+destType+".eventOrderDisabledStateDuration", "Router.eventOrderDisabledStateDuration")
rt.eventOrderHalfEnabledStateDuration = config.GetReloadableDurationVar(10, time.Minute, "Router."+destType+".eventOrderHalfEnabledStateDuration", "Router.eventOrderHalfEnabledStateDuration")
rt.reportJobsdbPayload = config.GetReloadableBoolVar(true, "Router."+destType+".reportJobsdbPayload", "Router.reportJobsdbPayload")

statTags := stats.Tags{"destType": rt.destType}
rt.tracer = stats.Default.NewTracer("router")
Expand Down
4 changes: 2 additions & 2 deletions router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,7 @@ var _ = Describe("router", func() {
Expect(metrics).To(HaveLen(1))
Expect(metrics[0].StatusDetail.StatusCode).To(Equal(200))
Expect(metrics[0].StatusDetail.Status).To(Equal(jobsdb.Succeeded.State))
Expect(metrics[0].StatusDetail.SampleEvent).To(Equal(json.RawMessage(`{"message": "some transformed message"}`)))
Expect(metrics[0].StatusDetail.SampleEvent).To(Equal(toRetryJobsList[0].EventPayload))
return nil
},
)
Expand Down Expand Up @@ -1458,7 +1458,7 @@ var _ = Describe("router", func() {
Expect(metrics[0].StatusDetail.StatusCode).To(Equal(500))
Expect(metrics[0].StatusDetail.Status).To(Equal(jobsdb.Failed.State))
Expect(metrics[0].StatusDetail.SampleEvent).To(Equal(json.RawMessage(gaPayload)))
Expect(metrics[0].StatusDetail.SampleResponse).To(ContainSubstring(`failureStage":"RudderStack Transformation Error"`))
Expect(metrics[0].StatusDetail.SampleResponse).To(ContainSubstring(`"routerSubStage":"router_dest_transformer"`))

return nil
},
Expand Down
27 changes: 20 additions & 7 deletions router/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,11 @@ func (w *worker) processDestinationJobs() {
respBody := strings.Join(respBodyArr, " ")
respStatusCodes, respBodys = w.prepareResponsesForJobs(&destinationJob, respStatusCode, respBody)
}
stats.Default.NewTaggedStat("router_delivery_payload_size_bytes", stats.HistogramType, stats.Tags{
"destType": w.rt.destType,
"workspaceID": destinationJob.JobMetadataArray[0].WorkspaceID,
"destinationID": destinationJob.JobMetadataArray[0].DestinationID,
}).Observe(float64(len(destinationJob.Message)))
}
}
ch <- struct{}{}
Expand Down Expand Up @@ -985,13 +990,9 @@ func (w *worker) postStatusOnResponseQ(respStatusCode int, payload json.RawMessa
}
}

inputPayload := payload
switch errorAt {
case routerutils.ERROR_AT_TF:
inputPayload = destinationJobMetadata.JobT.EventPayload
status.ErrorResponse = misc.UpdateJSONWithNewKeyVal(status.ErrorResponse, "failureStage", "RudderStack Transformation Error")
default: // includes ERROR_AT_DEL, ERROR_AT_CUST
status.ErrorResponse = misc.UpdateJSONWithNewKeyVal(status.ErrorResponse, "failureStage", "Destination Error")
inputPayload := destinationJobMetadata.JobT.EventPayload
if !w.rt.reportJobsdbPayload.Load() { // TODO: update default/remove this flag after monitoring the payload sizes
inputPayload = payload
}

status.ErrorResponse = routerutils.EnhanceJSON(status.ErrorResponse, "firstAttemptedAt", firstAttemptedAtTime.Format(misc.RFC3339Milli))
Expand All @@ -1012,6 +1013,18 @@ func (w *worker) postStatusOnResponseQ(respStatusCode int, payload json.RawMessa
}
return
}
if !isSuccessStatus(respStatusCode) {
switch errorAt {
case routerutils.ERROR_AT_TF:
inputPayload = destinationJobMetadata.JobT.EventPayload
status.ErrorResponse = misc.UpdateJSONWithNewKeyVal(status.ErrorResponse, "routerSubStage", "router_dest_transformer")
default: // includes ERROR_AT_DEL, ERROR_AT_CUST
status.ErrorResponse = misc.UpdateJSONWithNewKeyVal(status.ErrorResponse, "routerSubStage", "router_dest_delivery")
}
// TODO: update after observing the sizes of the payloads
status.ErrorResponse = misc.UpdateJSONWithNewKeyVal(status.ErrorResponse, "payloadStage", "router_input")
}

// Saving payload to DB only
// 1. if job failed and
// 2. if router job undergoes batching or dest transform.
Expand Down
4 changes: 4 additions & 0 deletions runner/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,5 +128,9 @@ var (
// 1microsecond, 2.5microsecond, 5microsecond, 1ms, 5ms, 10ms, 25ms, 50ms, 100ms, 250ms, 500ms, 1s
0.00001, 0.00025, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1,
},
"router_delivery_payload_size_bytes": {
float64(1 * bytesize.KB), float64(10 * bytesize.KB), float64(100 * bytesize.KB),
float64(1 * bytesize.MB), float64(3 * bytesize.MB), float64(5 * bytesize.MB), float64(10 * bytesize.MB),
},
}
)
2 changes: 1 addition & 1 deletion utils/types/reporting_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (

const MaxLengthExceeded = ":max-length-exceeded:"

var (
const (
DiffStatus = "diff"

// Module names
Expand Down
Loading