From a4fd722b8b400e4e18e468aed71791dc4911ed08 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Wed, 11 Oct 2023 18:54:30 +0530 Subject: [PATCH] fixup! chore: update processor logic to accomodate jobRunID aborts --- processor/manager.go | 38 +++++-- processor/processor.go | 61 +++++++----- processor/processor_test.go | 194 ++++++++++++++++++++---------------- 3 files changed, 174 insertions(+), 119 deletions(-) diff --git a/processor/manager.go b/processor/manager.go index 926cf31bf77..7678bca7b4c 100644 --- a/processor/manager.go +++ b/processor/manager.go @@ -54,8 +54,20 @@ func (proc *LifecycleManager) Start() error { } proc.Handle.Setup( - proc.BackendConfig, proc.gatewayDB, proc.routerDB, proc.batchRouterDB, proc.readErrDB, proc.writeErrDB, proc.esDB, proc.arcDB, - proc.ReportingI, proc.transientSources, proc.fileuploader, proc.rsourcesService, proc.destDebugger, proc.transDebugger, + proc.BackendConfig, + proc.gatewayDB, + proc.routerDB, + proc.batchRouterDB, + proc.readErrDB, + proc.writeErrDB, + proc.esDB, + proc.arcDB, + proc.ReportingI, + proc.transientSources, + proc.fileuploader, + proc.rsourcesService, + proc.destDebugger, + proc.transDebugger, ) currentCtx, cancel := context.WithCancel(context.Background()) @@ -91,13 +103,27 @@ func WithFeaturesRetryMaxAttempts(maxAttempts int) func(l *LifecycleManager) { } // New creates a new Processor instance -func New(ctx context.Context, clearDb *bool, gwDb, rtDb, brtDb, errDbForRead, errDBForWrite, esDB, arcDB *jobsdb.Handle, - reporting types.Reporting, transientSources transientsource.Service, fileuploader fileuploader.Provider, - rsourcesService rsources.JobService, destDebugger destinationdebugger.DestinationDebugger, transDebugger transformationdebugger.TransformationDebugger, +func New( + ctx context.Context, + clearDb *bool, + gwDb, rtDb, brtDb, errDbForRead, errDBForWrite, esDB, arcDB *jobsdb.Handle, + reporting types.Reporting, + transientSources transientsource.Service, + fileuploader fileuploader.Provider, + rsourcesService rsources.JobService, + destDebugger destinationdebugger.DestinationDebugger, + transDebugger transformationdebugger.TransformationDebugger, opts ...Opts, ) *LifecycleManager { proc := &LifecycleManager{ - Handle: NewHandle(transformer.NewTransformer(config.Default, logger.NewLogger().Child("processor"), stats.Default)), + Handle: NewHandle( + config.Default, + transformer.NewTransformer( + config.Default, + logger.NewLogger().Child("processor"), + stats.Default, + ), + ), mainCtx: ctx, gatewayDB: gwDb, routerDB: rtDb, diff --git a/processor/processor.go b/processor/processor.go index fc677829af9..faeccf6c151 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -60,14 +60,15 @@ const ( var jsonfast = jsoniter.ConfigCompatibleWithStandardLibrary -func NewHandle(transformer transformer.Transformer) *Handle { - h := &Handle{transformer: transformer} +func NewHandle(c *config.Config, transformer transformer.Transformer) *Handle { + h := &Handle{transformer: transformer, conf: c} h.loadConfig() return h } // Handle is a handle to the processor module type Handle struct { + conf *config.Config backendConfig backendconfig.BackendConfig transformer transformer.Transformer lastJobID int64 @@ -142,6 +143,10 @@ type Handle struct { eventAuditEnabled map[string]bool } + drainConfig struct { + jobRunIDs misc.ValueLoader[[]string] + } + adaptiveLimit func(int64) int64 storePlocker kitsync.PartitionLocker } @@ -348,15 +353,24 @@ func (proc *Handle) newEventFilterStat(sourceID, workspaceID string, destination // Setup initializes the module func (proc *Handle) Setup( - backendConfig backendconfig.BackendConfig, gatewayDB, routerDB, - batchRouterDB, readErrorDB, writeErrorDB, eventSchemaDB, archivalDB jobsdb.JobsDB, reporting types.Reporting, + backendConfig backendconfig.BackendConfig, + gatewayDB, routerDB, batchRouterDB, + readErrorDB, writeErrorDB, + eventSchemaDB, archivalDB jobsdb.JobsDB, + reporting types.Reporting, transientSources transientsource.Service, - fileuploader fileuploader.Provider, rsourcesService rsources.JobService, destDebugger destinationdebugger.DestinationDebugger, transDebugger transformationdebugger.TransformationDebugger, + fileuploader fileuploader.Provider, + rsourcesService rsources.JobService, + destDebugger destinationdebugger.DestinationDebugger, + transDebugger transformationdebugger.TransformationDebugger, ) { proc.reporting = reporting proc.destDebugger = destDebugger proc.transDebugger = transDebugger proc.reportingEnabled = config.GetBoolVar(types.DefaultReportingEnabled, "Reporting.enabled") + if proc.conf == nil { + proc.conf = config.Default + } proc.setupReloadableVars() proc.logger = logger.NewLogger().Child("processor") proc.backendConfig = backendConfig @@ -475,9 +489,10 @@ func (proc *Handle) Setup( } func (proc *Handle) setupReloadableVars() { - proc.jobdDBQueryRequestTimeout = config.GetReloadableDurationVar(600, time.Second, "JobsDB.Processor.QueryRequestTimeout", "JobsDB.QueryRequestTimeout") - proc.jobsDBCommandTimeout = config.GetReloadableDurationVar(600, time.Second, "JobsDB.Processor.CommandRequestTimeout", "JobsDB.CommandRequestTimeout") - proc.jobdDBMaxRetries = config.GetReloadableIntVar(2, 1, "JobsDB.Processor.MaxRetries", "JobsDB.MaxRetries") + proc.jobdDBQueryRequestTimeout = proc.conf.GetReloadableDurationVar(600, time.Second, "JobsDB.Processor.QueryRequestTimeout", "JobsDB.QueryRequestTimeout") + proc.jobsDBCommandTimeout = proc.conf.GetReloadableDurationVar(600, time.Second, "JobsDB.Processor.CommandRequestTimeout", "JobsDB.CommandRequestTimeout") + proc.jobdDBMaxRetries = proc.conf.GetReloadableIntVar(2, 1, "JobsDB.Processor.MaxRetries", "JobsDB.MaxRetries") + proc.drainConfig.jobRunIDs = proc.conf.GetReloadableStringSliceVar([]string{}, "RSources.toAbortJobRunIDs") } // Start starts this processor's main loops. @@ -1595,6 +1610,19 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf if !proc.isDestinationAvailable(singularEvent, sourceId) { continue } + // check if jobRunId is cancelled + if len(proc.drainConfig.jobRunIDs.Load()) > 0 { + if slices.Contains( + proc.drainConfig.jobRunIDs.Load(), + commonMetadataFromSingularEvent.SourceJobRunID, + ) { + proc.logger.Debugf( + "cancelled jobRunID: %s", + commonMetadataFromSingularEvent.SourceJobRunID, + ) + continue + } + } if _, ok := groupedEventsBySourceId[SourceIDT(sourceId)]; !ok { groupedEventsBySourceId[SourceIDT(sourceId)] = make([]transformer.TransformerEvent, 0) @@ -2582,27 +2610,10 @@ func ConvertToFilteredTransformerResponse(events []transformer.TransformerEvent, supportedMessageTypesCache := make(map[string]*cacheValue) supportedMessageEventsCache := make(map[string]*cacheValue) - toAbortJobRunIDs := config.GetStringVar("", "RSources.toAbortJobRunIDs") - // filter unsupported message types for i := range events { event := &events[i] - if toAbortJobRunIDs != "" { - abortIDs := strings.Split(toAbortJobRunIDs, ",") - if slices.Contains(abortIDs, event.Metadata.SourceJobRunID) { - failedEvents = append(failedEvents, - transformer.TransformerResponse{ - Output: event.Message, - StatusCode: 400, - Metadata: event.Metadata, - Error: "jobRunID configured to abort", - }, - ) - continue - } - } - if filter { // filter unsupported message types supportedTypes, ok := supportedMessageTypesCache[event.Destination.ID] diff --git a/processor/processor_test.go b/processor/processor_test.go index 1af4389b32c..06bcb058514 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -655,7 +655,7 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() { StoreInTx(gomock.Any(), gomock.Any(), gomock.Any()). Times(0) - processor := prepareHandle(NewHandle(mockTransformer)) + processor := prepareHandle(NewHandle(config.Default, mockTransformer)) Setup(processor, c, false, false) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) @@ -847,7 +847,7 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { Expect(jobs).To(HaveLen(2)) }) - processor := prepareHandle(NewHandle(mockTransformer)) + processor := prepareHandle(NewHandle(config.Default, mockTransformer)) Setup(processor, c, false, false) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) @@ -996,7 +996,7 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { StoreInTx(gomock.Any(), gomock.Any(), gomock.Any()). Times(0) - processor := prepareHandle(NewHandle(mockTransformer)) + processor := prepareHandle(NewHandle(config.Default, mockTransformer)) Setup(processor, c, false, false) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) @@ -1047,7 +1047,7 @@ var _ = Describe("Processor", Ordered, func() { It("should initialize (no jobs to recover)", func() { mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) - processor := prepareHandle(NewHandle(mockTransformer)) + processor := prepareHandle(NewHandle(config.Default, mockTransformer)) // crash recover returns empty list c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1) @@ -1076,7 +1076,7 @@ var _ = Describe("Processor", Ordered, func() { It("should recover after crash", func() { mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) - processor := prepareHandle(NewHandle(mockTransformer)) + processor := prepareHandle(NewHandle(config.Default, mockTransformer)) c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1) @@ -1111,7 +1111,7 @@ var _ = Describe("Processor", Ordered, func() { It("should only send proper stats, if not pending jobs are returned", func() { mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) - processor := prepareHandle(NewHandle(mockTransformer)) + processor := prepareHandle(NewHandle(config.Default, mockTransformer)) processor.Setup( c.mockBackendConfig, @@ -1275,7 +1275,7 @@ var _ = Describe("Processor", Ordered, func() { } mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) - processor := prepareHandle(NewHandle(mockTransformer)) + processor := prepareHandle(NewHandle(config.Default, mockTransformer)) callUnprocessed := c.mockGatewayJobsDB.EXPECT().GetUnprocessed( gomock.Any(), jobsdb.GetQueryParams{ @@ -1483,7 +1483,7 @@ var _ = Describe("Processor", Ordered, func() { mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) - processor := prepareHandle(NewHandle(mockTransformer)) + processor := prepareHandle(NewHandle(config.Default, mockTransformer)) callUnprocessed := c.mockGatewayJobsDB.EXPECT().GetUnprocessed(gomock.Any(), jobsdb.GetQueryParams{ CustomValFilters: gatewayCustomVal, @@ -1664,7 +1664,7 @@ var _ = Describe("Processor", Ordered, func() { _ = f(jobsdb.EmptyUpdateSafeTx()) }).Return(nil).Times(1) c.mockGatewayJobsDB.EXPECT().UpdateJobStatusInTx(gomock.Any(), gomock.Any(), gomock.Len(len(unprocessedJobsList)), gatewayCustomVal, nil).Times(1).After(callStoreRouter) - processor := prepareHandle(NewHandle(mockTransformer)) + processor := prepareHandle(NewHandle(config.Default, mockTransformer)) Setup(processor, c, true, false) @@ -1771,7 +1771,7 @@ var _ = Describe("Processor", Ordered, func() { mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) - processor := prepareHandle(NewHandle(mockTransformer)) + processor := prepareHandle(NewHandle(config.Default, mockTransformer)) c.mockGatewayJobsDB.EXPECT().GetUnprocessed(gomock.Any(), jobsdb.GetQueryParams{ CustomValFilters: gatewayCustomVal, @@ -1908,7 +1908,7 @@ var _ = Describe("Processor", Ordered, func() { mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) - processor := prepareHandle(NewHandle(mockTransformer)) + processor := prepareHandle(NewHandle(config.Default, mockTransformer)) c.mockGatewayJobsDB.EXPECT().GetUnprocessed(gomock.Any(), jobsdb.GetQueryParams{ CustomValFilters: gatewayCustomVal, @@ -1995,7 +1995,7 @@ var _ = Describe("Processor", Ordered, func() { mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) - processor := prepareHandle(NewHandle(mockTransformer)) + processor := prepareHandle(NewHandle(config.Default, mockTransformer)) c.mockGatewayJobsDB.EXPECT().GetUnprocessed(gomock.Any(), jobsdb.GetQueryParams{ CustomValFilters: gatewayCustomVal, @@ -2034,13 +2034,99 @@ var _ = Describe("Processor", Ordered, func() { processorSetupAndAssertJobHandling(processor, c) }) + + It("should drop messages where jobRunID is cancelled", func() { + messages := map[string]mockEventData{ + "message-1": { + id: "1", + jobid: 1010, + originalTimestamp: "2000-01-02T01:23:45", + expectedOriginalTimestamp: "2000-01-02T01:23:45.000Z", + sentAt: "2000-01-02 01:23", + expectedSentAt: "2000-01-02T01:23:00.000Z", + expectedReceivedAt: "2001-01-02T02:23:45.000Z", + integrations: map[string]bool{"All": false, "enabled-destination-a-definition-display-name": true}, + }, + } + payload := createBatchPayload( + WriteKeyEnabledNoUT2, + "2001-01-02T02:23:45.000Z", + []mockEventData{ + messages["message-1"], + }, + createMessagePayload, + ) + payload, _ = sjson.SetBytes(payload, "batch.0.type", "identify") + + unprocessedJobsList := []*jobsdb.JobT{ + { + UUID: uuid.New(), + JobID: 1010, + CreatedAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC), + CustomVal: gatewayCustomVal[0], + EventPayload: payload, + LastJobStatus: jobsdb.JobStatusT{}, + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT2), + }, + } + + var toRetryJobsList []*jobsdb.JobT + + c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1) + + mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) + + processor := prepareHandle(NewHandle(config.Default, mockTransformer)) + + c.mockGatewayJobsDB.EXPECT().GetUnprocessed(gomock.Any(), jobsdb.GetQueryParams{ + CustomValFilters: gatewayCustomVal, + JobsLimit: processor.config.maxEventsToProcess.Load(), + EventsLimit: processor.config.maxEventsToProcess.Load(), + PayloadSizeLimit: processor.payloadLimit.Load(), + }).Return(jobsdb.JobsResult{Jobs: unprocessedJobsList}, nil).Times(1) + + // Test transformer failure + mockTransformer.EXPECT().Transform(gomock.Any(), gomock.Len(0), gomock.Any()).Times(0) + + c.mockGatewayJobsDB.EXPECT().WithUpdateSafeTx(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, f func(tx jobsdb.UpdateSafeTx) error) { + _ = f(jobsdb.EmptyUpdateSafeTx()) + }).Return(nil).Times(1) + c.mockGatewayJobsDB.EXPECT().UpdateJobStatusInTx(gomock.Any(), gomock.Any(), gomock.Len(len(toRetryJobsList)+len(unprocessedJobsList)), gatewayCustomVal, nil).Times(1). + Do(func(ctx context.Context, txn jobsdb.UpdateSafeTx, statuses []*jobsdb.JobStatusT, _, _ interface{}) { + // job should be marked as successful regardless of transformer response + assertJobStatus(unprocessedJobsList[0], statuses[0], jobsdb.Succeeded.State) + }) + + c.MockRsourcesService.EXPECT().IncrementStats(gomock.Any(), gomock.Any(), "job_run_id_1", gomock.Any(), rsources.Stats{Out: 1}).Times(1) + + c.mockArchivalDB.EXPECT().WithStoreSafeTx(gomock.Any(), gomock.Any()).AnyTimes().Do(func(ctx context.Context, f func(tx jobsdb.StoreSafeTx) error) { + _ = f(jobsdb.EmptyStoreSafeTx()) + }).Return(nil) + + c.mockArchivalDB.EXPECT(). + StoreInTx(gomock.Any(), gomock.Any(), gomock.Any()). + Times(0) + + c.mockWriteProcErrorsDB.EXPECT().WithTx(gomock.Any()).Do(func(f func(tx *jobsdb.Tx) error) { + _ = f(&jobsdb.Tx{}) + }).Return(nil).Times(0) + + // One Store call is expected for all events + c.mockWriteProcErrorsDB.EXPECT().Store(gomock.Any(), gomock.Any()).Times(0). + Do(func(ctx context.Context, jobs []*jobsdb.JobT) {}) + + config.Set("RSources.toAbortJobRunIDs", "job_run_id_1") + defer config.Reset() + processorSetupAndAssertJobHandling(processor, c) + }) }) Context("MainLoop Tests", func() { It("Should not handle jobs when transformer features are not set", func() { mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) - processor := prepareHandle(NewHandle(mockTransformer)) + processor := prepareHandle(NewHandle(config.Default, mockTransformer)) // crash recover returns empty list c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1) @@ -2097,7 +2183,7 @@ var _ = Describe("Processor", Ordered, func() { It("Should not handle jobs when transformer features are not set", func() { mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) - processor := prepareHandle(NewHandle(mockTransformer)) + processor := prepareHandle(NewHandle(config.Default, mockTransformer)) // crash recover returns empty list c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1) @@ -2169,7 +2255,7 @@ var _ = Describe("Processor", Ordered, func() { mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) - processor := prepareHandle(NewHandle(mockTransformer)) + processor := prepareHandle(NewHandle(config.Default, mockTransformer)) Setup(processor, c, false, false) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) @@ -2208,7 +2294,7 @@ var _ = Describe("Processor", Ordered, func() { mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) - processor := prepareHandle(NewHandle(mockTransformer)) + processor := prepareHandle(NewHandle(config.Default, mockTransformer)) Setup(processor, c, false, true) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) @@ -2423,7 +2509,7 @@ var _ = Describe("Static Function Tests", func() { Context("updateMetricMaps Tests", func() { It("Should update metric maps", func() { - proc := NewHandle(nil) + proc := NewHandle(config.Default, nil) proc.reportingEnabled = true proc.reporting = &mockReportingTypes.MockReporting{} @@ -3480,74 +3566,6 @@ var _ = Describe("Static Function Tests", func() { response := ConvertToFilteredTransformerResponse(events, true) Expect(response).To(Equal(expectedResponse)) }) - - It("filters events if jobRunID is configured to abort", func() { - destinationConfig := backendconfig.DestinationT{ - DestinationDefinition: backendconfig.DestinationDefinitionT{ - Config: map[string]interface{}{}, - }, - } - config.Set("RSources.toAbortJobRunIDs", "abortJobRunID") - - events := []transformer.TransformerEvent{ - { - Metadata: transformer.Metadata{ - MessageID: "message-1", - SourceJobRunID: "abortJobRunID", - }, - Message: map[string]interface{}{ - "some-key-1": "some-value-1", - "type": " IDENTIFY ", - }, - Destination: destinationConfig, - }, - { - Metadata: transformer.Metadata{ - MessageID: "message-2", - }, - Message: map[string]interface{}{ - "some-key-2": "some-value-2", - "type": "track", - }, - Destination: destinationConfig, - }, - { - Metadata: transformer.Metadata{ - MessageID: "message-2", - }, - Message: map[string]interface{}{ - "some-key-2": "some-value-2", - "type": 123, - }, - Destination: destinationConfig, - }, - } - - expectedResponse := transformer.Response{ - Events: []transformer.TransformerResponse{ - { - Output: events[1].Message, - StatusCode: 200, - Metadata: events[1].Metadata, - }, - { - Output: events[2].Message, - StatusCode: 200, - Metadata: events[2].Metadata, - }, - }, - FailedEvents: []transformer.TransformerResponse{ - { - Output: events[0].Message, - StatusCode: 400, - Metadata: events[0].Metadata, - Error: "jobRunID configured to abort", - }, - }, - } - response := ConvertToFilteredTransformerResponse(events, true) - Expect(response).To(Equal(expectedResponse)) - }) }) }) @@ -3824,7 +3842,7 @@ var _ = Describe("TestJobSplitter", func() { } Context("testing jobs splitter, which split jobs into some sub-jobs", func() { It("default subJobSize: 2k", func() { - proc := NewHandle(nil) + proc := NewHandle(config.Default, nil) expectedSubJobs := []subJob{ { subJobs: []*jobsdb.JobT{ @@ -3851,7 +3869,7 @@ var _ = Describe("TestJobSplitter", func() { Expect(proc.jobSplitter(jobs, nil)).To(Equal(expectedSubJobs)) }) It("subJobSize: 1, i.e. dividing read jobs into batch of 1", func() { - proc := NewHandle(nil) + proc := NewHandle(config.Default, nil) proc.config.subJobSize = 1 expectedSubJobs := []subJob{ { @@ -3898,7 +3916,7 @@ var _ = Describe("TestJobSplitter", func() { Expect(proc.jobSplitter(jobs, nil)).To(Equal(expectedSubJobs)) }) It("subJobSize: 2, i.e. dividing read jobs into batch of 2", func() { - proc := NewHandle(nil) + proc := NewHandle(config.Default, nil) proc.config.subJobSize = 2 expectedSubJobs := []subJob{ {