Skip to content

Commit

Permalink
chore: fix status and stats
Browse files Browse the repository at this point in the history
  • Loading branch information
Rohith BCS authored and Rohith BCS committed Oct 22, 2024
1 parent 99e128d commit 3fdf8b8
Showing 1 changed file with 21 additions and 18 deletions.
39 changes: 21 additions & 18 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1633,6 +1633,7 @@ func (proc *Handle) processJobsForDestV2(partition string, subJobs subJob) (*tra
proc.stats.statNumRequests(partition).Count(len(jobList))

var statusList []*jobsdb.JobStatusT
statusMap := make(map[int64]bool)

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / lint

declared and not used: statusMap) (typecheck)

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / lint

declared and not used: statusMap) (typecheck)

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / lint

declared and not used: statusMap (typecheck)

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Integration (enterprise)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Integration (oss)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Warehouse Integration (datalake)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Warehouse Integration (azure-synapse)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Warehouse Integration (redshift)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Warehouse Integration (clickhouse)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Warehouse Integration (postgres)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Warehouse Integration (mssql)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Warehouse Integration (deltalake)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Warehouse Integration (snowflake)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Warehouse Integration (bigquery)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (integration_test/reporting_dropped_events)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (integration_test/reporting_dropped_events)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (integration_test/docker_test)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (integration_test/docker_test)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (integration_test/backendconfigunavailability)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (integration_test/backendconfigunavailability)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (integration_test/trackedusersreporting)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (integration_test/trackedusersreporting)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (integration_test/reporting_error_index)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (integration_test/reporting_error_index)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (processor)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (processor)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (processor)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (processor)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (integration_test/tracing)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (integration_test/tracing)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Unit

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Unit

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (warehouse)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (warehouse)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (warehouse)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (warehouse)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (warehouse)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (warehouse)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (router)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (router)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (router)

declared and not used: statusMap

Check failure on line 1636 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Package Unit (router)

declared and not used: statusMap
groupedEvents := make(map[string][]transformer.TransformerEvent)
groupedEventsBySourceId := make(map[SourceIDT][]transformer.TransformerEvent)
eventsByMessageID := make(map[string]types.SingularEventWithReceivedAt)
Expand Down Expand Up @@ -1713,6 +1714,20 @@ func (proc *Handle) processJobsForDestV2(partition string, subJobs subJob) (*tra
spans = append(spans, span)
}

newStatus := jobsdb.JobStatusT{
JobID: batchEvent.JobID,
JobState: jobsdb.Succeeded.State,
AttemptNum: 1,
ExecTime: time.Now(),
RetryTime: time.Now(),
ErrorCode: "200",
ErrorResponse: []byte(`{"success":"OK"}`),
Parameters: []byte(`{}`),
JobParameters: batchEvent.Parameters,
WorkspaceId: batchEvent.WorkspaceId,
}
statusList = append(statusList, &newStatus)

parameters := batchEvent.Parameters
var gatewayBatchEvent types.GatewayBatchRequest
if err := jsonfast.Unmarshal(batchEvent.EventPayload, &gatewayBatchEvent); err != nil {
Expand All @@ -1721,10 +1736,16 @@ func (proc *Handle) processJobsForDestV2(partition string, subJobs subJob) (*tra
}
proc.logger.Warnw("json parsing of event payload", "jobID", batchEvent.JobID, "error", err)
gatewayBatchEvent.Batch = []types.SingularEventT{}
continue
}
requestIP := gatewayBatchEvent.RequestIP
receivedAt := gatewayBatchEvent.ReceivedAt

proc.statsFactory.NewSampledTaggedStat("processor.event_pickup_lag_seconds", stats.TimerType, stats.Tags{
"sourceId": eventParams.SourceId,
"workspaceId": batchEvent.WorkspaceId,
}).Since(receivedAt)

source, err := proc.getSourceBySourceID(eventParams.SourceId)
if err != nil {
if span != nil {
Expand Down Expand Up @@ -1765,10 +1786,6 @@ func (proc *Handle) processJobsForDestV2(partition string, subJobs subJob) (*tra
customVal: batchEvent.CustomVal,
})
dedupKeysWithWorkspaceID = append(dedupKeysWithWorkspaceID, dedupKey)
proc.statsFactory.NewSampledTaggedStat("processor.event_pickup_lag_seconds", stats.TimerType, stats.Tags{
"sourceId": eventParams.SourceId,
"workspaceId": batchEvent.WorkspaceId,
}).Since(receivedAt)
}
}

Expand All @@ -1786,20 +1803,6 @@ func (proc *Handle) processJobsForDestV2(partition string, subJobs subJob) (*tra
if event.eventParams.DestinationID != "" {
jobIDToSpecificDestMapOnly[event.jobID] = event.eventParams.DestinationID
}
newStatus := jobsdb.JobStatusT{
JobID: event.jobID,
JobState: jobsdb.Succeeded.State,
AttemptNum: 1,
ExecTime: time.Now(),
RetryTime: time.Now(),
ErrorCode: "200",
ErrorResponse: []byte(`{"success":"OK"}`),
Parameters: []byte(`{}`),
JobParameters: event.parameters,
WorkspaceId: event.workspaceID,
}
statusList = append(statusList, &newStatus)

payloadFunc := ro.Memoize(func() json.RawMessage {
payloadBytes, err := jsonfast.Marshal(event.singularEvent)
if err != nil {
Expand Down

0 comments on commit 3fdf8b8

Please sign in to comment.