Skip to content

Commit

Permalink
fixup! chore: minor processor cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Sep 20, 2023
1 parent 886479b commit 131bf8a
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 13 deletions.
2 changes: 1 addition & 1 deletion processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2401,7 +2401,7 @@ var _ = Describe("Static Function Tests", func() {
countMap := make(map[string]int64)
countMetadataMap := make(map[string]MetricMetadata)
// update metric maps
proc.updateMetricMaps(countMetadataMap, countMap, connectionDetailsMap, statusDetailsMap, inputEvent, jobsdb.Succeeded.State, transformer.TrackingPlanValidationStage, func() json.RawMessage { return []byte(`{}`) })
proc.updateMetricMaps(countMetadataMap, countMap, connectionDetailsMap, statusDetailsMap, inputEvent, jobsdb.Succeeded.State, types.TRACKINGPLAN_VALIDATOR, func() json.RawMessage { return []byte(`{}`) })

Expect(len(countMetadataMap)).To(Equal(1))
Expect(len(countMap)).To(Equal(1))
Expand Down
15 changes: 7 additions & 8 deletions processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ import (
)

const (
UserTransformerStage = "user_transformer"
EventFilterStage = "event_filter"
DestTransformerStage = "dest_transformer"
TrackingPlanValidationStage = "trackingPlan_validation"
userTransformerStage = "user_transformer"
destTransformerStage = "dest_transformer"
trackingPlanValidationStage = "trackingPlan_validation"
)

const (
Expand Down Expand Up @@ -240,17 +239,17 @@ func (trans *handle) Transform(ctx context.Context, clientEvents []TransformerEv
}
destType := clientEvents[0].Destination.DestinationDefinition.Name
transformURL := trans.destTransformURL(destType)
return trans.transform(ctx, clientEvents, transformURL, batchSize, DestTransformerStage)
return trans.transform(ctx, clientEvents, transformURL, batchSize, destTransformerStage)
}

// UserTransform function is used to invoke user transformer API
func (trans *handle) UserTransform(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response {
return trans.transform(ctx, clientEvents, trans.userTransformURL(), batchSize, UserTransformerStage)
return trans.transform(ctx, clientEvents, trans.userTransformURL(), batchSize, userTransformerStage)
}

// Validate function is used to invoke tracking plan validation API
func (trans *handle) Validate(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response {
return trans.transform(ctx, clientEvents, trans.trackingPlanValidationURL(), batchSize, TrackingPlanValidationStage)
return trans.transform(ctx, clientEvents, trans.trackingPlanValidationURL(), batchSize, trackingPlanValidationStage)
}

func (trans *handle) transform(
Expand Down Expand Up @@ -464,7 +463,7 @@ func (trans *handle) doPost(ctx context.Context, rawJSON []byte, url, stage stri
},
)
if err != nil {
if trans.config.failOnUserTransformTimeout.Load() && stage == UserTransformerStage && os.IsTimeout(err) {
if trans.config.failOnUserTransformTimeout.Load() && stage == userTransformerStage && os.IsTimeout(err) {
return []byte(fmt.Sprintf("transformer request timed out: %s", err)), TransformerRequestTimeout
} else if trans.config.failOnError.Load() {
return []byte(fmt.Sprintf("transformer request failed: %s", err)), TransformerRequestFailure
Expand Down
8 changes: 4 additions & 4 deletions processor/transformer/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,14 @@ func TestTransformer(t *testing.T) {
{
name: "user transformation timeout",
retries: 3,
stage: UserTransformerStage,
stage: userTransformerStage,
expectPanic: true,
failOnUserTransformTimeout: false,
},
{
name: "user transformation timeout with fail on timeout",
retries: 3,
stage: UserTransformerStage,
stage: userTransformerStage,
expectPanic: false,
expectedResponse: []TransformerResponse{
{
Expand All @@ -268,14 +268,14 @@ func TestTransformer(t *testing.T) {
{
name: "destination transformation timeout",
retries: 3,
stage: DestTransformerStage,
stage: destTransformerStage,
expectPanic: true,
failOnUserTransformTimeout: false,
},
{
name: "destination transformation timeout with fail on timeout",
retries: 3,
stage: DestTransformerStage,
stage: destTransformerStage,
expectPanic: true,
failOnUserTransformTimeout: true,
},
Expand Down

0 comments on commit 131bf8a

Please sign in to comment.