Skip to content

Commit

Permalink
chore: minor processor cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Sep 15, 2023
1 parent 2c655f9 commit 2f0205c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 77 deletions.
95 changes: 20 additions & 75 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ func (proc *Handle) recordEventDeliveryStatus(jobsByDestID map[string][]*jobsdb.
}
}

func (proc *Handle) getDestTransformerEvents(response transformer.Response, commonMetaData *transformer.Metadata, eventsByMessageID map[string]types.SingularEventWithReceivedAt, destination *backendconfig.DestinationT, stage string, trackingPlanEnabled, userTransformationEnabled bool) ([]transformer.TransformerEvent, []*types.PUReportedMetric, map[string]int64, map[string]MetricMetadata) {
func (proc *Handle) getDestTransformerEvents(response transformer.Response, commonMetaData *transformer.Metadata, eventsByMessageID map[string]types.SingularEventWithReceivedAt, destination *backendconfig.DestinationT, inPU, pu string) ([]transformer.TransformerEvent, []*types.PUReportedMetric, map[string]int64, map[string]MetricMetadata) {
successMetrics := make([]*types.PUReportedMetric, 0)
connectionDetailsMap := make(map[string]*types.ConnectionDetails)
statusDetailsMap := make(map[string]map[string]*types.StatusDetail)
Expand All @@ -969,8 +969,8 @@ func (proc *Handle) getDestTransformerEvents(response transformer.Response, comm
}

for _, message := range messages {
proc.updateMetricMaps(successCountMetadataMap, successCountMap, connectionDetailsMap, statusDetailsMap, userTransformedEvent, jobsdb.Succeeded.State, stage, func() json.RawMessage {
if stage != transformer.TrackingPlanValidationStage {
proc.updateMetricMaps(successCountMetadataMap, successCountMap, connectionDetailsMap, statusDetailsMap, userTransformedEvent, jobsdb.Succeeded.State, pu, func() json.RawMessage {
if pu != types.TRACKINGPLAN_VALIDATOR {
return []byte(`{}`)
}
if proc.transientSources.Apply(commonMetaData.SourceID) {
Expand Down Expand Up @@ -1013,30 +1013,6 @@ func (proc *Handle) getDestTransformerEvents(response transformer.Response, comm
if proc.isReportingEnabled() {
types.AssertSameKeys(connectionDetailsMap, statusDetailsMap)

var inPU, pu string
if stage == transformer.UserTransformerStage {
if trackingPlanEnabled {
inPU = types.TRACKINGPLAN_VALIDATOR
} else {
inPU = types.DESTINATION_FILTER
}
pu = types.USER_TRANSFORMER
} else if stage == transformer.TrackingPlanValidationStage {
inPU = types.DESTINATION_FILTER
pu = types.TRACKINGPLAN_VALIDATOR
} else if stage == transformer.EventFilterStage {
if userTransformationEnabled {
inPU = types.USER_TRANSFORMER
} else {
if trackingPlanEnabled {
inPU = types.TRACKINGPLAN_VALIDATOR
} else {
inPU = types.DESTINATION_FILTER
}
}
pu = types.EVENT_FILTER
}

for k, cd := range connectionDetailsMap {
for _, sd := range statusDetailsMap[k] {
m := &types.PUReportedMetric{
Expand Down Expand Up @@ -1137,7 +1113,7 @@ func (proc *Handle) updateMetricMaps(
// create status details for each validation error
// single event can have multiple validation errors of same type
veCount := len(event.ValidationErrors)
if stage == transformer.TrackingPlanValidationStage && status == jobsdb.Succeeded.State {
if stage == types.TRACKINGPLAN_VALIDATOR && status == jobsdb.Succeeded.State {
if veCount > 0 {
status = types.SUCCEEDED_WITH_VIOLATIONS
} else {
Expand Down Expand Up @@ -1173,7 +1149,7 @@ func (proc *Handle) updateMetricMaps(
}
}

func (proc *Handle) getFailedEventJobs(response transformer.Response, commonMetaData *transformer.Metadata, eventsByMessageID map[string]types.SingularEventWithReceivedAt, stage string, transformationEnabled, trackingPlanEnabled bool) ([]*jobsdb.JobT, []*types.PUReportedMetric, map[string]int64) {
func (proc *Handle) getFailedEventJobs(response transformer.Response, commonMetaData *transformer.Metadata, eventsByMessageID map[string]types.SingularEventWithReceivedAt, inPU, pu string) ([]*jobsdb.JobT, []*types.PUReportedMetric, map[string]int64) {
failedMetrics := make([]*types.PUReportedMetric, 0)
connectionDetailsMap := make(map[string]*types.ConnectionDetails)
statusDetailsMap := make(map[string]map[string]*types.StatusDetail)
Expand All @@ -1194,7 +1170,7 @@ func (proc *Handle) getFailedEventJobs(response transformer.Response, commonMeta
}

for _, message := range messages {
proc.updateMetricMaps(nil, failedCountMap, connectionDetailsMap, statusDetailsMap, failedEvent, jobsdb.Aborted.State, stage, func() json.RawMessage {
proc.updateMetricMaps(nil, failedCountMap, connectionDetailsMap, statusDetailsMap, failedEvent, jobsdb.Aborted.State, pu, func() json.RawMessage {
if proc.transientSources.Apply(commonMetaData.SourceID) {
return []byte(`{}`)
}
Expand All @@ -1219,7 +1195,7 @@ func (proc *Handle) getFailedEventJobs(response transformer.Response, commonMeta
"source_job_run_id": failedEvent.Metadata.SourceJobRunID,
"error": failedEvent.Error,
"status_code": failedEvent.StatusCode,
"stage": stage,
"stage": pu,
"record_id": failedEvent.Metadata.RecordID,
"source_task_run_id": failedEvent.Metadata.SourceTaskRunID,
}
Expand Down Expand Up @@ -1248,7 +1224,7 @@ func (proc *Handle) getFailedEventJobs(response transformer.Response, commonMeta
procErrorStat := stats.Default.NewTaggedStat("proc_error_counts", stats.CountType, stats.Tags{
"destName": commonMetaData.DestinationType,
"statusCode": strconv.Itoa(failedEvent.StatusCode),
"stage": stage,
"stage": pu,
})

procErrorStat.Increment()
Expand All @@ -1257,33 +1233,6 @@ func (proc *Handle) getFailedEventJobs(response transformer.Response, commonMeta
// REPORTING - START
if proc.isReportingEnabled() {
types.AssertSameKeys(connectionDetailsMap, statusDetailsMap)

var inPU, pu string
if stage == transformer.EventFilterStage {
if transformationEnabled {
inPU = types.USER_TRANSFORMER
} else {
if trackingPlanEnabled {
inPU = types.TRACKINGPLAN_VALIDATOR
} else {
inPU = types.DESTINATION_FILTER
}
}
pu = types.EVENT_FILTER
} else if stage == transformer.DestTransformerStage {
inPU = types.EVENT_FILTER
pu = types.DEST_TRANSFORMER
} else if stage == transformer.UserTransformerStage {
if trackingPlanEnabled {
inPU = types.TRACKINGPLAN_VALIDATOR
} else {
inPU = types.DESTINATION_FILTER
}
pu = types.USER_TRANSFORMER
} else if stage == transformer.TrackingPlanValidationStage {
inPU = types.DESTINATION_FILTER
pu = types.TRACKINGPLAN_VALIDATOR
}
for k, cd := range connectionDetailsMap {
for _, sd := range statusDetailsMap[k] {
m := &types.PUReportedMetric{
Expand Down Expand Up @@ -2208,6 +2157,12 @@ func (proc *Handle) transformSrcDest(

var response transformer.Response
var eventsToTransform []transformer.TransformerEvent
var inPU string
if trackingPlanEnabled {
inPU = types.TRACKINGPLAN_VALIDATOR

Check warning on line 2162 in processor/processor.go

View check run for this annotation

Codecov / codecov/patch

processor/processor.go#L2162

Added line #L2162 was not covered by tests
} else {
inPU = types.DESTINATION_FILTER
}
// Send to custom transformer only if the destination has a transformer enabled
if transformationEnabled {
userTransformationStat := proc.newUserTransformationStat(sourceID, workspaceID, destination)
Expand All @@ -2223,8 +2178,8 @@ func (proc *Handle) transformSrcDest(
var successMetrics []*types.PUReportedMetric
var successCountMap map[string]int64
var successCountMetadataMap map[string]MetricMetadata
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, transformer.UserTransformerStage, trackingPlanEnabled, transformationEnabled)
failedJobs, failedMetrics, failedCountMap := proc.getFailedEventJobs(response, commonMetaData, eventsByMessageID, transformer.UserTransformerStage, transformationEnabled, trackingPlanEnabled)
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, inPU, types.USER_TRANSFORMER)
failedJobs, failedMetrics, failedCountMap := proc.getFailedEventJobs(response, commonMetaData, eventsByMessageID, inPU, types.USER_TRANSFORMER)
proc.saveFailedJobs(failedJobs)
if _, ok := procErrorJobsByDestID[destID]; !ok {
procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0)
Expand Down Expand Up @@ -2256,6 +2211,7 @@ func (proc *Handle) transformSrcDest(
inCountMetadataMap = successCountMetadataMap
}
// REPORTING - END
inPU = types.USER_TRANSFORMER // for the next step in the pipeline
})
} else {
proc.logger.Debug("No custom transformation")
Expand Down Expand Up @@ -2291,24 +2247,13 @@ func (proc *Handle) transformSrcDest(
var successMetrics []*types.PUReportedMetric
var successCountMap map[string]int64
var successCountMetadataMap map[string]MetricMetadata
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, transformer.EventFilterStage, trackingPlanEnabled, transformationEnabled)
failedJobs, failedMetrics, failedCountMap := proc.getFailedEventJobs(response, commonMetaData, eventsByMessageID, transformer.EventFilterStage, transformationEnabled, trackingPlanEnabled)
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, inPU, types.EVENT_FILTER)
failedJobs, failedMetrics, failedCountMap := proc.getFailedEventJobs(response, commonMetaData, eventsByMessageID, inPU, types.EVENT_FILTER)
proc.saveFailedJobs(failedJobs)
proc.logger.Debug("Supported messages filtering output size", len(eventsToTransform))

// REPORTING - START
if proc.isReportingEnabled() {
var inPU string
if transformationEnabled {
inPU = types.USER_TRANSFORMER
} else {
if trackingPlanEnabled {
inPU = types.TRACKINGPLAN_VALIDATOR
} else {
inPU = types.DESTINATION_FILTER
}
}

diffMetrics := getDiffMetrics(inPU, types.EVENT_FILTER, inCountMetadataMap, inCountMap, successCountMap, failedCountMap)
reportMetrics = append(reportMetrics, successMetrics...)
reportMetrics = append(reportMetrics, failedMetrics...)
Expand Down Expand Up @@ -2358,7 +2303,7 @@ func (proc *Handle) transformSrcDest(

failedJobs, failedMetrics, failedCountMap := proc.getFailedEventJobs(
response, commonMetaData, eventsByMessageID,
transformer.DestTransformerStage, transformationEnabled, trackingPlanEnabled,
types.EVENT_FILTER, types.DEST_TRANSFORMER,
)
destTransformationStat.numEvents.Count(len(eventsToTransform))
destTransformationStat.numOutputSuccessEvents.Count(len(response.Events))
Expand Down
4 changes: 2 additions & 2 deletions processor/trackingplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func (proc *Handle) validateEvents(groupedEventsBySourceId map[SourceIDT][]trans
trackingPlanEnabledMap[SourceIDT(sourceID)] = true

var successMetrics []*types.PUReportedMetric
eventsToTransform, successMetrics, _, _ := proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, transformer.TrackingPlanValidationStage, true, false) // Note: Sending false for usertransformation enabled is safe because this stage is before user transformation.
failedJobs, failedMetrics, _ := proc.getFailedEventJobs(response, commonMetaData, eventsByMessageID, transformer.TrackingPlanValidationStage, false, true)
eventsToTransform, successMetrics, _, _ := proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, types.DESTINATION_FILTER, types.TRACKINGPLAN_VALIDATOR) // Note: Sending false for usertransformation enabled is safe because this stage is before user transformation.
failedJobs, failedMetrics, _ := proc.getFailedEventJobs(response, commonMetaData, eventsByMessageID, types.DESTINATION_FILTER, types.TRACKINGPLAN_VALIDATOR)

Check warning on line 116 in processor/trackingplan.go

View check run for this annotation

Codecov / codecov/patch

processor/trackingplan.go#L115-L116

Added lines #L115 - L116 were not covered by tests

validationStat.numValidationSuccessEvents.Count(len(eventsToTransform))
validationStat.numValidationFailedEvents.Count(len(failedJobs))
Expand Down

0 comments on commit 2f0205c

Please sign in to comment.