Skip to content

Commit

Permalink
chore: add tests for processor worker (#4884)
Browse files Browse the repository at this point in the history
  • Loading branch information
mihir20 authored Jul 16, 2024
1 parent d962876 commit 9db1ea4
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 25 deletions.
49 changes: 49 additions & 0 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5495,10 +5495,32 @@ func TestStoreMessageMerge(t *testing.T) {
trackedUsersReports: []*trackedusers.UsersReport{{WorkspaceID: sampleWorkspaceID}, {WorkspaceID: sampleWorkspaceID}},
}

sm3 := &storeMessage{
[]*trackedusers.UsersReport{{WorkspaceID: sampleWorkspaceID}, {WorkspaceID: sampleWorkspaceID}},
[]*jobsdb.JobStatusT{{JobID: 3}},
[]*jobsdb.JobT{{JobID: 3}},
[]*jobsdb.JobT{{JobID: 3}},
[]*jobsdb.JobT{{JobID: 3}},
map[string][]*jobsdb.JobT{
"3": {{JobID: 3}},
},
[]*jobsdb.JobT{{JobID: 3}},
[]string{"3"},
[]*types.PUReportedMetric{{}},
map[dupStatKey]int{{sourceID: "1"}: 3},
map[string]struct{}{"3": {}},
1,
time.Time{},
false,
nil,
map[string]stats.Tags{},
}

merged := storeMessage{
procErrorJobsByDestID: map[string][]*jobsdb.JobT{},
sourceDupStats: map[dupStatKey]int{},
dedupKeys: map[string]struct{}{},
start: time.UnixMicro(99999999),
}

merged.merge(sm1)
Expand Down Expand Up @@ -5527,4 +5549,31 @@ func TestStoreMessageMerge(t *testing.T) {
require.Len(t, merged.dedupKeys, 2, "dedup keys should have 2 elements")
require.Equal(t, merged.totalEvents, 2, "total events should be 2")
require.Len(t, merged.trackedUsersReports, 3, "trackedUsersReports should have 3 element")

merged.merge(sm3)
require.Equal(t, merged, storeMessage{
trackedUsersReports: []*trackedusers.UsersReport{
{WorkspaceID: sampleWorkspaceID},
{WorkspaceID: sampleWorkspaceID},
{WorkspaceID: sampleWorkspaceID},
{WorkspaceID: sampleWorkspaceID},
{WorkspaceID: sampleWorkspaceID},
},
statusList: []*jobsdb.JobStatusT{{JobID: 1}, {JobID: 2}, {JobID: 3}},
destJobs: []*jobsdb.JobT{{JobID: 1}, {JobID: 2}, {JobID: 3}},
batchDestJobs: []*jobsdb.JobT{{JobID: 1}, {JobID: 2}, {JobID: 3}},
droppedJobs: []*jobsdb.JobT{{JobID: 3}},
procErrorJobsByDestID: map[string][]*jobsdb.JobT{
"1": {{JobID: 1}},
"2": {{JobID: 2}},
"3": {{JobID: 3}},
},
procErrorJobs: []*jobsdb.JobT{{JobID: 1}, {JobID: 2}, {JobID: 3}},
routerDestIDs: []string{"1", "2", "3"},
reportMetrics: []*types.PUReportedMetric{{}, {}, {}},
sourceDupStats: map[dupStatKey]int{{sourceID: "1"}: 6},
dedupKeys: map[string]struct{}{"1": {}, "2": {}, "3": {}},
totalEvents: 3,
start: time.UnixMicro(99999999),
})
}
94 changes: 69 additions & 25 deletions processor/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-server/enterprise/trackedusers"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/config"
Expand All @@ -21,19 +23,22 @@ import (
)

func TestWorkerPool(t *testing.T) {
run := func(t *testing.T, pipelining, limitsReached bool) {
run := func(t *testing.T, pipelining, limitsReached, shouldProcessMultipleSubJobs bool) {
wh := &mockWorkerHandle{
pipelining: pipelining,
log: logger.NOP,
loopEvents: 100,
partitionStats: map[string]struct {
queried int
marked int
processed int
transformed int
stored int
queried int
marked int
processed int
transformed int
stored int
subBatches int
trackedUsers int
}{},
limitsReached: limitsReached,
limitsReached: limitsReached,
shouldProcessMultipleSubJobs: shouldProcessMultipleSubJobs,
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -84,19 +89,25 @@ func TestWorkerPool(t *testing.T) {

t.Run("work without pipelining", func(t *testing.T) {
t.Run("limits not reached", func(t *testing.T) {
run(t, false, false)
run(t, false, false, false)
})
t.Run("limits reached", func(t *testing.T) {
run(t, false, true)
run(t, false, true, false)
})
})

t.Run("work with pipelining", func(t *testing.T) {
t.Run("limits not reached", func(t *testing.T) {
run(t, true, false)
run(t, true, false, false)
})
t.Run("limits reached", func(t *testing.T) {
run(t, true, true)
run(t, true, true, false)
})
t.Run("limits reached with multiple sub jobs", func(t *testing.T) {
run(t, true, true, true)
})
t.Run("limits not reached with multiple sub jobs", func(t *testing.T) {
run(t, true, false, true)
})
})
}
Expand All @@ -107,11 +118,13 @@ func TestWorkerPoolIdle(t *testing.T) {
log: logger.NewLogger(),
loopEvents: 0,
partitionStats: map[string]struct {
queried int
marked int
processed int
transformed int
stored int
queried int
marked int
processed int
transformed int
stored int
subBatches int
trackedUsers int
}{},
}
poolCtx, poolCancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -144,11 +157,13 @@ type mockWorkerHandle struct {
statsMu sync.RWMutex
log logger.Logger
partitionStats map[string]struct {
queried int
marked int
processed int
transformed int
stored int
queried int
marked int
processed int
transformed int
stored int
subBatches int
trackedUsers int
}

limiters struct {
Expand All @@ -158,7 +173,8 @@ type mockWorkerHandle struct {
store kitsync.Limiter
}

limitsReached bool
limitsReached bool
shouldProcessMultipleSubJobs bool
}

func (m *mockWorkerHandle) tracer() stats.Tracer {
Expand All @@ -173,6 +189,7 @@ func (m *mockWorkerHandle) validate(t *testing.T) {
require.Equalf(t, s.marked, s.processed, "Partition %s: Marked %d, Processed %d", partition, s.queried, s.marked)
require.Equalf(t, s.processed, s.transformed, "Partition %s: Processed %d, Transformed %d", partition, s.queried, s.marked)
require.Equalf(t, s.transformed, s.stored, "Partition %s: Transformed %d, Stored %d", partition, s.queried, s.marked)
require.Equalf(t, s.subBatches, s.trackedUsers, "Partition %s: Tracked Users %d, Subjobs %d", partition, s.trackedUsers, s.subBatches)
}
}

Expand Down Expand Up @@ -259,10 +276,29 @@ func (m *mockWorkerHandle) markExecuting(partition string, jobs []*jobsdb.JobT)
return nil
}

func (*mockWorkerHandle) jobSplitter(jobs []*jobsdb.JobT, rsourcesStats rsources.StatsCollector) []subJob {
func (m *mockWorkerHandle) jobSplitter(jobs []*jobsdb.JobT, rsourcesStats rsources.StatsCollector) []subJob {
if !m.shouldProcessMultipleSubJobs {
return []subJob{
{
subJobs: jobs,
hasMore: false,
rsourcesStats: rsourcesStats,
},
}
}
return []subJob{
{
subJobs: jobs,
subJobs: jobs[0 : len(jobs)/3],
hasMore: true,
rsourcesStats: rsourcesStats,
},
{
subJobs: jobs[len(jobs)/3 : 2*len(jobs)/2],
hasMore: true,
rsourcesStats: rsourcesStats,
},
{
subJobs: jobs[2*len(jobs)/2:],
hasMore: false,
rsourcesStats: rsourcesStats,
},
Expand All @@ -277,11 +313,16 @@ func (m *mockWorkerHandle) processJobsForDest(partition string, subJobs subJob)
defer m.statsMu.Unlock()
s := m.partitionStats[partition]
s.processed += len(subJobs.subJobs)
s.subBatches += 1
m.partitionStats[partition] = s
m.log.Infof("processJobsForDest partition: %s stats: %+v", partition, s)

return &transformationMessage{
totalEvents: len(subJobs.subJobs),
hasMore: subJobs.hasMore,
trackedUsersReports: []*trackedusers.UsersReport{
{WorkspaceID: sampleWorkspaceID},
},
}
}

Expand All @@ -297,7 +338,9 @@ func (m *mockWorkerHandle) transformations(partition string, in *transformationM
m.log.Infof("transformations partition: %s stats: %+v", partition, s)

return &storeMessage{
totalEvents: in.totalEvents,
totalEvents: in.totalEvents,
hasMore: in.hasMore,
trackedUsersReports: in.trackedUsersReports,
}
}

Expand All @@ -309,6 +352,7 @@ func (m *mockWorkerHandle) Store(partition string, in *storeMessage) {
defer m.statsMu.Unlock()
s := m.partitionStats[partition]
s.stored += in.totalEvents
s.trackedUsers += len(in.trackedUsersReports)
m.partitionStats[partition] = s
m.log.Infof("Store partition: %s stats: %+v", partition, s)
}

0 comments on commit 9db1ea4

Please sign in to comment.