diff --git a/processor/processor_test.go b/processor/processor_test.go index 08de0600f1..d876de931d 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -5495,10 +5495,32 @@ func TestStoreMessageMerge(t *testing.T) { trackedUsersReports: []*trackedusers.UsersReport{{WorkspaceID: sampleWorkspaceID}, {WorkspaceID: sampleWorkspaceID}}, } + sm3 := &storeMessage{ + []*trackedusers.UsersReport{{WorkspaceID: sampleWorkspaceID}, {WorkspaceID: sampleWorkspaceID}}, + []*jobsdb.JobStatusT{{JobID: 3}}, + []*jobsdb.JobT{{JobID: 3}}, + []*jobsdb.JobT{{JobID: 3}}, + []*jobsdb.JobT{{JobID: 3}}, + map[string][]*jobsdb.JobT{ + "3": {{JobID: 3}}, + }, + []*jobsdb.JobT{{JobID: 3}}, + []string{"3"}, + []*types.PUReportedMetric{{}}, + map[dupStatKey]int{{sourceID: "1"}: 3}, + map[string]struct{}{"3": {}}, + 1, + time.Time{}, + false, + nil, + map[string]stats.Tags{}, + } + merged := storeMessage{ procErrorJobsByDestID: map[string][]*jobsdb.JobT{}, sourceDupStats: map[dupStatKey]int{}, dedupKeys: map[string]struct{}{}, + start: time.UnixMicro(99999999), } merged.merge(sm1) @@ -5527,4 +5549,31 @@ func TestStoreMessageMerge(t *testing.T) { require.Len(t, merged.dedupKeys, 2, "dedup keys should have 2 elements") require.Equal(t, merged.totalEvents, 2, "total events should be 2") require.Len(t, merged.trackedUsersReports, 3, "trackedUsersReports should have 3 element") + + merged.merge(sm3) + require.Equal(t, merged, storeMessage{ + trackedUsersReports: []*trackedusers.UsersReport{ + {WorkspaceID: sampleWorkspaceID}, + {WorkspaceID: sampleWorkspaceID}, + {WorkspaceID: sampleWorkspaceID}, + {WorkspaceID: sampleWorkspaceID}, + {WorkspaceID: sampleWorkspaceID}, + }, + statusList: []*jobsdb.JobStatusT{{JobID: 1}, {JobID: 2}, {JobID: 3}}, + destJobs: []*jobsdb.JobT{{JobID: 1}, {JobID: 2}, {JobID: 3}}, + batchDestJobs: []*jobsdb.JobT{{JobID: 1}, {JobID: 2}, {JobID: 3}}, + droppedJobs: []*jobsdb.JobT{{JobID: 3}}, + procErrorJobsByDestID: map[string][]*jobsdb.JobT{ + "1": {{JobID: 1}}, + "2": {{JobID: 2}}, + "3": {{JobID: 3}}, + }, + procErrorJobs: []*jobsdb.JobT{{JobID: 1}, {JobID: 2}, {JobID: 3}}, + routerDestIDs: []string{"1", "2", "3"}, + reportMetrics: []*types.PUReportedMetric{{}, {}, {}}, + sourceDupStats: map[dupStatKey]int{{sourceID: "1"}: 6}, + dedupKeys: map[string]struct{}{"1": {}, "2": {}, "3": {}}, + totalEvents: 3, + start: time.UnixMicro(99999999), + }) } diff --git a/processor/worker_test.go b/processor/worker_test.go index 49119a031c..e4f2df4529 100644 --- a/processor/worker_test.go +++ b/processor/worker_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/rudderlabs/rudder-server/enterprise/trackedusers" + "github.com/stretchr/testify/require" "github.com/rudderlabs/rudder-go-kit/config" @@ -21,19 +23,22 @@ import ( ) func TestWorkerPool(t *testing.T) { - run := func(t *testing.T, pipelining, limitsReached bool) { + run := func(t *testing.T, pipelining, limitsReached, shouldProcessMultipleSubJobs bool) { wh := &mockWorkerHandle{ pipelining: pipelining, log: logger.NOP, loopEvents: 100, partitionStats: map[string]struct { - queried int - marked int - processed int - transformed int - stored int + queried int + marked int + processed int + transformed int + stored int + subBatches int + trackedUsers int }{}, - limitsReached: limitsReached, + limitsReached: limitsReached, + shouldProcessMultipleSubJobs: shouldProcessMultipleSubJobs, } ctx, cancel := context.WithCancel(context.Background()) @@ -84,19 +89,25 @@ func TestWorkerPool(t *testing.T) { t.Run("work without pipelining", func(t *testing.T) { t.Run("limits not reached", func(t *testing.T) { - run(t, false, false) + run(t, false, false, false) }) t.Run("limits reached", func(t *testing.T) { - run(t, false, true) + run(t, false, true, false) }) }) t.Run("work with pipelining", func(t *testing.T) { t.Run("limits not reached", func(t *testing.T) { - run(t, true, false) + run(t, true, false, false) }) t.Run("limits reached", func(t *testing.T) { - run(t, true, true) + run(t, true, true, false) + }) + t.Run("limits reached with multiple sub jobs", func(t *testing.T) { + run(t, true, true, true) + }) + t.Run("limits not reached with multiple sub jobs", func(t *testing.T) { + run(t, true, false, true) }) }) } @@ -107,11 +118,13 @@ func TestWorkerPoolIdle(t *testing.T) { log: logger.NewLogger(), loopEvents: 0, partitionStats: map[string]struct { - queried int - marked int - processed int - transformed int - stored int + queried int + marked int + processed int + transformed int + stored int + subBatches int + trackedUsers int }{}, } poolCtx, poolCancel := context.WithCancel(context.Background()) @@ -144,11 +157,13 @@ type mockWorkerHandle struct { statsMu sync.RWMutex log logger.Logger partitionStats map[string]struct { - queried int - marked int - processed int - transformed int - stored int + queried int + marked int + processed int + transformed int + stored int + subBatches int + trackedUsers int } limiters struct { @@ -158,7 +173,8 @@ type mockWorkerHandle struct { store kitsync.Limiter } - limitsReached bool + limitsReached bool + shouldProcessMultipleSubJobs bool } func (m *mockWorkerHandle) tracer() stats.Tracer { @@ -173,6 +189,7 @@ func (m *mockWorkerHandle) validate(t *testing.T) { require.Equalf(t, s.marked, s.processed, "Partition %s: Marked %d, Processed %d", partition, s.queried, s.marked) require.Equalf(t, s.processed, s.transformed, "Partition %s: Processed %d, Transformed %d", partition, s.queried, s.marked) require.Equalf(t, s.transformed, s.stored, "Partition %s: Transformed %d, Stored %d", partition, s.queried, s.marked) + require.Equalf(t, s.subBatches, s.trackedUsers, "Partition %s: Tracked Users %d, Subjobs %d", partition, s.trackedUsers, s.subBatches) } } @@ -259,10 +276,29 @@ func (m *mockWorkerHandle) markExecuting(partition string, jobs []*jobsdb.JobT) return nil } -func (*mockWorkerHandle) jobSplitter(jobs []*jobsdb.JobT, rsourcesStats rsources.StatsCollector) []subJob { +func (m *mockWorkerHandle) jobSplitter(jobs []*jobsdb.JobT, rsourcesStats rsources.StatsCollector) []subJob { + if !m.shouldProcessMultipleSubJobs { + return []subJob{ + { + subJobs: jobs, + hasMore: false, + rsourcesStats: rsourcesStats, + }, + } + } return []subJob{ { - subJobs: jobs, + subJobs: jobs[0 : len(jobs)/3], + hasMore: true, + rsourcesStats: rsourcesStats, + }, + { + subJobs: jobs[len(jobs)/3 : 2*len(jobs)/2], + hasMore: true, + rsourcesStats: rsourcesStats, + }, + { + subJobs: jobs[2*len(jobs)/2:], hasMore: false, rsourcesStats: rsourcesStats, }, @@ -277,11 +313,16 @@ func (m *mockWorkerHandle) processJobsForDest(partition string, subJobs subJob) defer m.statsMu.Unlock() s := m.partitionStats[partition] s.processed += len(subJobs.subJobs) + s.subBatches += 1 m.partitionStats[partition] = s m.log.Infof("processJobsForDest partition: %s stats: %+v", partition, s) return &transformationMessage{ totalEvents: len(subJobs.subJobs), + hasMore: subJobs.hasMore, + trackedUsersReports: []*trackedusers.UsersReport{ + {WorkspaceID: sampleWorkspaceID}, + }, } } @@ -297,7 +338,9 @@ func (m *mockWorkerHandle) transformations(partition string, in *transformationM m.log.Infof("transformations partition: %s stats: %+v", partition, s) return &storeMessage{ - totalEvents: in.totalEvents, + totalEvents: in.totalEvents, + hasMore: in.hasMore, + trackedUsersReports: in.trackedUsersReports, } } @@ -309,6 +352,7 @@ func (m *mockWorkerHandle) Store(partition string, in *storeMessage) { defer m.statsMu.Unlock() s := m.partitionStats[partition] s.stored += in.totalEvents + s.trackedUsers += len(in.trackedUsersReports) m.partitionStats[partition] = s m.log.Infof("Store partition: %s stats: %+v", partition, s) }