From 90fdddf5ca76ffecc52798fddd2490df4979cf05 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Fri, 6 Oct 2023 13:02:32 +0530 Subject: [PATCH] chore: update processor logic to accomodate jobRunID aborts --- processor/processor.go | 52 ++++++++++++++--- processor/processor_test.go | 69 ++++++++++++++++++++++ router/batchrouter/handle.go | 1 + router/batchrouter/handle_lifecycle.go | 1 + router/batchrouter/worker.go | 10 +++- router/handle_lifecycle.go | 1 + router/router_test.go | 79 ++++++++++++++++++++++++++ router/types.go | 4 +- router/utils/utils.go | 31 +++++----- router/worker.go | 10 +++- 10 files changed, 234 insertions(+), 24 deletions(-) diff --git a/processor/processor.go b/processor/processor.go index c08a5f4f6b7..fc60541960f 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -140,6 +140,8 @@ type Handle struct { eventSchemaV2Enabled bool archivalEnabled misc.ValueLoader[bool] eventAuditEnabled map[string]bool + + toAbortJobRunIDs misc.ValueLoader[string] } adaptiveLimit func(int64) int64 @@ -2582,11 +2584,27 @@ func ConvertToFilteredTransformerResponse(events []transformer.TransformerEvent, supportedMessageTypesCache := make(map[string]*cacheValue) supportedMessageEventsCache := make(map[string]*cacheValue) + toAbortJobRunIDs := config.GetStringVar("", "RSources.toAbortJobRunIDs") + // filter unsupported message types - var resp transformer.TransformerResponse for i := range events { event := &events[i] + if toAbortJobRunIDs != "" { + abortIDs := strings.Split(toAbortJobRunIDs, ",") + if slices.Contains(abortIDs, event.Metadata.SourceJobRunID) { + failedEvents = append(failedEvents, + transformer.TransformerResponse{ + Output: event.Message, + StatusCode: 400, + Metadata: event.Metadata, + Error: "jobRunID configured to abort", + }, + ) + continue + } + } + if filter { // filter unsupported message types supportedTypes, ok := supportedMessageTypesCache[event.Destination.ID] @@ -2616,21 +2634,41 @@ func ConvertToFilteredTransformerResponse(events []transformer.TransformerEvent, messageEvent, typOk := event.Message["event"].(string) if !typOk { // add to FailedEvents - resp = transformer.TransformerResponse{Output: event.Message, StatusCode: 400, Metadata: event.Metadata, Error: "Invalid message event. Type assertion failed"} - failedEvents = append(failedEvents, resp) + failedEvents = append( + failedEvents, + transformer.TransformerResponse{ + Output: event.Message, + StatusCode: 400, + Metadata: event.Metadata, + Error: "Invalid message event. Type assertion failed", + }, + ) continue } if !slices.Contains(supportedEvents.values, messageEvent) { - resp = transformer.TransformerResponse{Output: event.Message, StatusCode: types.FilterEventCode, Metadata: event.Metadata, Error: "Event not supported"} - failedEvents = append(failedEvents, resp) + failedEvents = append( + failedEvents, + transformer.TransformerResponse{ + Output: event.Message, + StatusCode: types.FilterEventCode, + Metadata: event.Metadata, + Error: "Event not supported", + }, + ) continue } } } // allow event - resp = transformer.TransformerResponse{Output: event.Message, StatusCode: 200, Metadata: event.Metadata} - responses = append(responses, resp) + responses = append( + responses, + transformer.TransformerResponse{ + Output: event.Message, + StatusCode: 200, + Metadata: event.Metadata, + }, + ) } return transformer.Response{Events: responses, FailedEvents: failedEvents} diff --git a/processor/processor_test.go b/processor/processor_test.go index d952226a1b4..3b1b35e090b 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -3480,6 +3480,75 @@ var _ = Describe("Static Function Tests", func() { response := ConvertToFilteredTransformerResponse(events, true) Expect(response).To(Equal(expectedResponse)) }) + + It("filters events if jobRunID is configured to abort", func() { + destinationConfig := backendconfig.DestinationT{ + DestinationDefinition: backendconfig.DestinationDefinitionT{ + Config: map[string]interface{}{}, + }, + } + config.Set("RSources.toAbortJobRunIDs", "abortJobRunID") + + events := []transformer.TransformerEvent{ + { + Metadata: transformer.Metadata{ + MessageID: "message-1", + SourceJobRunID: "abortJobRunID", + }, + Message: map[string]interface{}{ + "some-key-1": "some-value-1", + "type": " IDENTIFY ", + }, + Destination: destinationConfig, + }, + { + Metadata: transformer.Metadata{ + MessageID: "message-2", + }, + Message: map[string]interface{}{ + "some-key-2": "some-value-2", + "type": "track", + }, + Destination: destinationConfig, + }, + { + Metadata: transformer.Metadata{ + MessageID: "message-2", + }, + Message: map[string]interface{}{ + "some-key-2": "some-value-2", + "type": 123, + }, + Destination: destinationConfig, + }, + } + + expectedResponse := transformer.Response{ + Events: []transformer.TransformerResponse{ + { + Output: events[1].Message, + StatusCode: 200, + Metadata: events[1].Metadata, + }, + { + Output: events[2].Message, + StatusCode: 200, + Metadata: events[2].Metadata, + }, + }, + FailedEvents: []transformer.TransformerResponse{ + { + Output: events[0].Message, + StatusCode: 400, + Metadata: events[0].Metadata, + Error: "jobRunID configured to abort", + }, + }, + } + response := ConvertToFilteredTransformerResponse(events, true) + Expect(response).To(Equal(expectedResponse)) + + }) }) }) diff --git a/router/batchrouter/handle.go b/router/batchrouter/handle.go index 5de86817560..b31c93bb14c 100644 --- a/router/batchrouter/handle.go +++ b/router/batchrouter/handle.go @@ -83,6 +83,7 @@ type Handle struct { mainLoopFreq misc.ValueLoader[time.Duration] disableEgress bool toAbortDestinationIDs misc.ValueLoader[string] + toAbortJobRunIDs misc.ValueLoader[string] warehouseServiceMaxRetryTime misc.ValueLoader[time.Duration] transformerURL string datePrefixOverride misc.ValueLoader[string] diff --git a/router/batchrouter/handle_lifecycle.go b/router/batchrouter/handle_lifecycle.go index f4488eb79e5..b4a18846372 100644 --- a/router/batchrouter/handle_lifecycle.go +++ b/router/batchrouter/handle_lifecycle.go @@ -199,6 +199,7 @@ func (brt *Handle) setupReloadableVars() { brt.uploadFreq = config.GetReloadableDurationVar(30, time.Second, "BatchRouter.uploadFreqInS", "BatchRouter.uploadFreq") brt.mainLoopFreq = config.GetReloadableDurationVar(30, time.Second, "BatchRouter.mainLoopFreq") brt.toAbortDestinationIDs = config.GetReloadableStringVar("", "BatchRouter.toAbortDestinationIDs") + brt.toAbortJobRunIDs = config.GetReloadableStringVar("", "RSources.toAbortJobRunIDs") brt.warehouseServiceMaxRetryTime = config.GetReloadableDurationVar(3, time.Hour, "BatchRouter.warehouseServiceMaxRetryTime", "BatchRouter.warehouseServiceMaxRetryTimeinHr") brt.datePrefixOverride = config.GetReloadableStringVar("", "BatchRouter.datePrefixOverride") brt.customDatePrefix = config.GetReloadableStringVar("", "BatchRouter.customDatePrefix") diff --git a/router/batchrouter/worker.go b/router/batchrouter/worker.go index 18c3648f9d7..fc91fe0c468 100644 --- a/router/batchrouter/worker.go +++ b/router/batchrouter/worker.go @@ -76,7 +76,15 @@ func (w *worker) processJobAsync(jobsWg *sync.WaitGroup, destinationJobs *Destin for _, job := range destinationJobs.jobs { var params router_utils.JobParameters _ = json.Unmarshal(job.Parameters, ¶ms) - if drain, reason := router_utils.ToBeDrained(job, params, router_utils.AbortConfigs{ToAbortDestinationIDs: brt.toAbortDestinationIDs.Load()}, destinationsMap); drain { + if drain, reason := router_utils.ToBeDrained( + job, + params, + router_utils.AbortConfigs{ + DestinationIDs: brt.toAbortDestinationIDs.Load(), + JobRunIDs: brt.toAbortJobRunIDs.Load(), + }, + destinationsMap, + ); drain { status := jobsdb.JobStatusT{ JobID: job.JobID, AttemptNum: job.LastJobStatus.AttemptNum + 1, diff --git a/router/handle_lifecycle.go b/router/handle_lifecycle.go index 738b71f7c35..8aa036c5594 100644 --- a/router/handle_lifecycle.go +++ b/router/handle_lifecycle.go @@ -264,6 +264,7 @@ func (rt *Handle) setupReloadableVars() { rt.reloadableConfig.minRetryBackoff = config.GetReloadableDurationVar(10, time.Second, "Router.minRetryBackoff", "Router.minRetryBackoffInS") rt.reloadableConfig.maxRetryBackoff = config.GetReloadableDurationVar(300, time.Second, "Router.maxRetryBackoff", "Router.maxRetryBackoffInS") rt.reloadableConfig.toAbortDestinationIDs = config.GetReloadableStringVar("", "Router.toAbortDestinationIDs") + rt.reloadableConfig.toAbortJobRunIDs = config.GetReloadableStringVar("", "RSources.toAbortJobRunIDs") rt.reloadableConfig.pickupFlushInterval = config.GetReloadableDurationVar(2, time.Second, "Router.pickupFlushInterval") rt.reloadableConfig.failingJobsPenaltySleep = config.GetReloadableDurationVar(2000, time.Millisecond, "Router.failingJobsPenaltySleep") rt.reloadableConfig.failingJobsPenaltyThreshold = config.GetReloadableFloat64Var(0.6, "Router.failingJobsPenaltyThreshold") diff --git a/router/router_test.go b/router/router_test.go index 8e82d6aa96e..5c578c5dfbe 100644 --- a/router/router_test.go +++ b/router/router_test.go @@ -603,6 +603,85 @@ var _ = Describe("router", func() { Eventually(func() bool { return routerAborted && procErrorStored }, 5*time.Second, 100*time.Millisecond).Should(Equal(true)) }) + It("aborts jobs that bear a abort configured jobRunId", func() { + config.Set("RSources.toAbortJobRunIDs", "someJobRunId") + router := &Handle{ + Reporting: &reporting.NOOP{}, + } + c.mockBackendConfig.EXPECT().AccessToken().AnyTimes() + + router.Setup(gaDestinationDefinition, logger.NOP, conf, c.mockBackendConfig, c.mockRouterJobsDB, c.mockProcErrorsDB, transientsource.NewEmptyService(), rsources.NewNoOpService(), destinationdebugger.NewNoOpService()) + mockNetHandle := mocksRouter.NewMockNetHandle(c.mockCtrl) + router.netHandle = mockNetHandle + + gaPayload := `{"body": {"XML": {}, "FORM": {}, "JSON": {}}, "type": "REST", "files": {}, "method": "POST", "params": {"t": "event", "v": "1", "an": "RudderAndroidClient", "av": "1.0", "ds": "android-sdk", "ea": "Demo Track", "ec": "Demo Category", "el": "Demo Label", "ni": 0, "qt": 59268380964, "ul": "en-US", "cid": "anon_id", "tid": "UA-185645846-1", "uip": "[::1]", "aiid": "com.rudderlabs.android.sdk"}, "userId": "anon_id", "headers": {}, "version": "1", "endpoint": "https://www.google-analytics.com/collect"}` + parameters := fmt.Sprintf(`{"source_job_run_id": "someJobRunId", "source_id": "1fMCVYZboDlYlauh4GFsEo2JU77", "destination_id": "%s", "message_id": "2f548e6d-60f6-44af-a1f4-62b3272445c3", "received_at": "2021-06-28T10:04:48.527+05:30", "transform_at": "processor"}`, gaDestinationID) // skipcq: GO-R4002 + + unprocessedJobsList := []*jobsdb.JobT{ + { + UUID: uuid.New(), + UserID: "u1", + JobID: 2010, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: customVal["GA"], + EventPayload: []byte(gaPayload), + LastJobStatus: jobsdb.JobStatusT{ + AttemptNum: 0, + }, + Parameters: []byte(parameters), + WorkspaceId: workspaceID, + }, + } + + payloadLimit := router.reloadableConfig.payloadLimit + c.mockRouterJobsDB.EXPECT().GetToProcess(gomock.Any(), jobsdb.GetQueryParams{ + CustomValFilters: []string{customVal["GA"]}, + ParameterFilters: []jobsdb.ParameterFilterT{{Name: "destination_id", Value: gaDestinationID}}, + PayloadSizeLimit: payloadLimit.Load(), + JobsLimit: 10000, + }, nil).Times(1).Return(&jobsdb.MoreJobsResult{JobsResult: jobsdb.JobsResult{Jobs: unprocessedJobsList}}, nil) + + var routerAborted bool + var procErrorStored bool + + c.mockRouterJobsDB.EXPECT().UpdateJobStatus(gomock.Any(), gomock.Any(), []string{customVal["GA"]}, nil).Times(1) + + c.mockProcErrorsDB.EXPECT().Store(gomock.Any(), gomock.Any()).Times(1). + Do(func(ctx context.Context, jobList []*jobsdb.JobT) { + job := jobList[0] + var parameters map[string]interface{} + err := json.Unmarshal(job.Parameters, ¶meters) + if err != nil { + panic(err) + } + + Expect(job.JobID).To(Equal(unprocessedJobsList[0].JobID)) + Expect(job.CustomVal).To(Equal(unprocessedJobsList[0].CustomVal)) + Expect(job.UserID).To(Equal(unprocessedJobsList[0].UserID)) + procErrorStored = true + }) + + c.mockRouterJobsDB.EXPECT().WithUpdateSafeTx(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, f func(tx jobsdb.UpdateSafeTx) error) { + _ = f(jobsdb.EmptyUpdateSafeTx()) + }).Return(nil).Times(1) + + c.mockRouterJobsDB.EXPECT().UpdateJobStatusInTx(gomock.Any(), gomock.Any(), gomock.Any(), []string{customVal["GA"]}, nil).Times(1). + Do(func(ctx context.Context, tx jobsdb.UpdateSafeTx, drainList []*jobsdb.JobStatusT, _, _ interface{}) { + Expect(drainList).To(HaveLen(1)) + assertJobStatus(unprocessedJobsList[0], drainList[0], jobsdb.Aborted.State, "410", `{"reason": "job expired"}`, 0) + routerAborted = true + }) + + <-router.backendConfigInitialized + worker := newPartitionWorker(context.Background(), router, gaDestinationID) + defer worker.Stop() + Expect(worker.Work()).To(BeTrue()) + Expect(worker.pickupCount).To(Equal(len(unprocessedJobsList))) + Eventually(func() bool { return routerAborted && procErrorStored }, 5*time.Second, 100*time.Millisecond).Should(Equal(true)) + + }) + It("aborts events that have reached max retries", func() { config.Set("Router.jobRetention", "24h") mockNetHandle := mocksRouter.NewMockNetHandle(c.mockCtrl) diff --git a/router/types.go b/router/types.go index ff39c71cf62..cbaffdb3511 100644 --- a/router/types.go +++ b/router/types.go @@ -62,7 +62,6 @@ type reloadableConfig struct { jobsBatchTimeout misc.ValueLoader[time.Duration] failingJobsPenaltyThreshold misc.ValueLoader[float64] failingJobsPenaltySleep misc.ValueLoader[time.Duration] - toAbortDestinationIDs misc.ValueLoader[string] noOfJobsToBatchInAWorker misc.ValueLoader[int] jobsDBCommandTimeout misc.ValueLoader[time.Duration] jobdDBMaxRetries misc.ValueLoader[int] @@ -78,4 +77,7 @@ type reloadableConfig struct { transformerProxy misc.ValueLoader[bool] skipRtAbortAlertForTransformation misc.ValueLoader[bool] // represents if event delivery(via transformerProxy) should be alerted via router-aborted-count alert def skipRtAbortAlertForDelivery misc.ValueLoader[bool] // represents if transformation(router or batch) should be alerted via router-aborted-count alert def + + toAbortDestinationIDs misc.ValueLoader[string] + toAbortJobRunIDs misc.ValueLoader[string] } diff --git a/router/utils/utils.go b/router/utils/utils.go index f5b6b5f4b0a..3486de992eb 100644 --- a/router/utils/utils.go +++ b/router/utils/utils.go @@ -1,11 +1,10 @@ package utils import ( + "slices" "strings" "time" - "golang.org/x/exp/slices" - "github.com/tidwall/sjson" "github.com/rudderlabs/rudder-go-kit/config" @@ -67,14 +66,17 @@ type JobParameters struct { } type AbortConfigs struct { - ToAbortDestinationIDs string - // source - // connection - // jobRunID + DestinationIDs string + JobRunIDs string // ... } -func ToBeDrained(job *jobsdb.JobT, jobParams JobParameters, abortConfig AbortConfigs, destinationsMap map[string]*DestinationWithSources) (bool, string) { +func ToBeDrained( + job *jobsdb.JobT, + jobParams JobParameters, + abortConfig AbortConfigs, + destinationsMap map[string]*DestinationWithSources, +) (bool, string) { // drain if job is older than the destination's retention time createdAt := job.CreatedAt destID := jobParams.DestinationID @@ -86,18 +88,19 @@ func ToBeDrained(job *jobsdb.JobT, jobParams JobParameters, abortConfig AbortCon return true, "destination is disabled" } - toAbortDestinationIDs := abortConfig.ToAbortDestinationIDs - if toAbortDestinationIDs != "" { - abortIDs := strings.Split(toAbortDestinationIDs, ",") + if abortConfig.DestinationIDs != "" { + abortIDs := strings.Split(abortConfig.DestinationIDs, ",") if slices.Contains(abortIDs, destID) { return true, "destination configured to abort" } } - // toAbortSourceIDS - // toAbortConnection(sourceID, destID) - // toAbortJobRunID/TaskRunID - // ... + if abortConfig.JobRunIDs != "" { + abortIDs := strings.Split(abortConfig.JobRunIDs, ",") + if slices.Contains(abortIDs, jobParams.SourceJobRunID) { + return true, "jobRunID configured to abort" + } + } return false, "" } diff --git a/router/worker.go b/router/worker.go index fb8d8ef9d9a..6cf3a025262 100644 --- a/router/worker.go +++ b/router/worker.go @@ -90,7 +90,15 @@ func (w *worker) workLoop() { panic(fmt.Errorf("unmarshalling of job parameters failed for job %d (%s): %w", job.JobID, string(job.Parameters), err)) } w.rt.destinationsMapMu.RLock() - abort, abortReason := routerutils.ToBeDrained(job, parameters, routerutils.AbortConfigs{ToAbortDestinationIDs: w.rt.reloadableConfig.toAbortDestinationIDs.Load()}, w.rt.destinationsMap) + abort, abortReason := routerutils.ToBeDrained( + job, + parameters, + routerutils.AbortConfigs{ + DestinationIDs: w.rt.reloadableConfig.toAbortDestinationIDs.Load(), + JobRunIDs: w.rt.reloadableConfig.toAbortJobRunIDs.Load(), + }, + w.rt.destinationsMap, + ) abortTag := abortReason w.rt.destinationsMapMu.RUnlock() if !abort {