From 131bf8a686c5cac148e44b23d5e615f671647882 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Wed, 20 Sep 2023 11:27:00 +0530 Subject: [PATCH] fixup! chore: minor processor cleanup --- processor/processor_test.go | 2 +- processor/transformer/transformer.go | 15 +++++++-------- processor/transformer/transformer_test.go | 8 ++++---- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/processor/processor_test.go b/processor/processor_test.go index 156cd3db636..08bd7aaba69 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -2401,7 +2401,7 @@ var _ = Describe("Static Function Tests", func() { countMap := make(map[string]int64) countMetadataMap := make(map[string]MetricMetadata) // update metric maps - proc.updateMetricMaps(countMetadataMap, countMap, connectionDetailsMap, statusDetailsMap, inputEvent, jobsdb.Succeeded.State, transformer.TrackingPlanValidationStage, func() json.RawMessage { return []byte(`{}`) }) + proc.updateMetricMaps(countMetadataMap, countMap, connectionDetailsMap, statusDetailsMap, inputEvent, jobsdb.Succeeded.State, types.TRACKINGPLAN_VALIDATOR, func() json.RawMessage { return []byte(`{}`) }) Expect(len(countMetadataMap)).To(Equal(1)) Expect(len(countMap)).To(Equal(1)) diff --git a/processor/transformer/transformer.go b/processor/transformer/transformer.go index aa8b0e5476a..b4025bc091c 100644 --- a/processor/transformer/transformer.go +++ b/processor/transformer/transformer.go @@ -32,10 +32,9 @@ import ( ) const ( - UserTransformerStage = "user_transformer" - EventFilterStage = "event_filter" - DestTransformerStage = "dest_transformer" - TrackingPlanValidationStage = "trackingPlan_validation" + userTransformerStage = "user_transformer" + destTransformerStage = "dest_transformer" + trackingPlanValidationStage = "trackingPlan_validation" ) const ( @@ -240,17 +239,17 @@ func (trans *handle) Transform(ctx context.Context, clientEvents []TransformerEv } destType := clientEvents[0].Destination.DestinationDefinition.Name transformURL := trans.destTransformURL(destType) - return trans.transform(ctx, clientEvents, transformURL, batchSize, DestTransformerStage) + return trans.transform(ctx, clientEvents, transformURL, batchSize, destTransformerStage) } // UserTransform function is used to invoke user transformer API func (trans *handle) UserTransform(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response { - return trans.transform(ctx, clientEvents, trans.userTransformURL(), batchSize, UserTransformerStage) + return trans.transform(ctx, clientEvents, trans.userTransformURL(), batchSize, userTransformerStage) } // Validate function is used to invoke tracking plan validation API func (trans *handle) Validate(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response { - return trans.transform(ctx, clientEvents, trans.trackingPlanValidationURL(), batchSize, TrackingPlanValidationStage) + return trans.transform(ctx, clientEvents, trans.trackingPlanValidationURL(), batchSize, trackingPlanValidationStage) } func (trans *handle) transform( @@ -464,7 +463,7 @@ func (trans *handle) doPost(ctx context.Context, rawJSON []byte, url, stage stri }, ) if err != nil { - if trans.config.failOnUserTransformTimeout.Load() && stage == UserTransformerStage && os.IsTimeout(err) { + if trans.config.failOnUserTransformTimeout.Load() && stage == userTransformerStage && os.IsTimeout(err) { return []byte(fmt.Sprintf("transformer request timed out: %s", err)), TransformerRequestTimeout } else if trans.config.failOnError.Load() { return []byte(fmt.Sprintf("transformer request failed: %s", err)), TransformerRequestFailure diff --git a/processor/transformer/transformer_test.go b/processor/transformer/transformer_test.go index f883a8bdad7..0da31a1b982 100644 --- a/processor/transformer/transformer_test.go +++ b/processor/transformer/transformer_test.go @@ -243,14 +243,14 @@ func TestTransformer(t *testing.T) { { name: "user transformation timeout", retries: 3, - stage: UserTransformerStage, + stage: userTransformerStage, expectPanic: true, failOnUserTransformTimeout: false, }, { name: "user transformation timeout with fail on timeout", retries: 3, - stage: UserTransformerStage, + stage: userTransformerStage, expectPanic: false, expectedResponse: []TransformerResponse{ { @@ -268,14 +268,14 @@ func TestTransformer(t *testing.T) { { name: "destination transformation timeout", retries: 3, - stage: DestTransformerStage, + stage: destTransformerStage, expectPanic: true, failOnUserTransformTimeout: false, }, { name: "destination transformation timeout with fail on timeout", retries: 3, - stage: DestTransformerStage, + stage: destTransformerStage, expectPanic: true, failOnUserTransformTimeout: true, },