diff --git a/enterprise/trackedusers/users_reporter.go b/enterprise/trackedusers/users_reporter.go index 5292882b97..0afce49908 100644 --- a/enterprise/trackedusers/users_reporter.go +++ b/enterprise/trackedusers/users_reporter.go @@ -35,6 +35,8 @@ const ( murmurSeed = 123 trackUsersTable = "tracked_users_reports" + + eventTypeAlias = "alias" ) type UsersReport struct { @@ -120,7 +122,7 @@ func (u *UniqueUsersReporter) GenerateReportsFromJobs(jobs []*jobsdb.JobT, sourc } userID := gjson.GetBytes(job.EventPayload, "batch.0.userId").String() anonymousID := gjson.GetBytes(job.EventPayload, "batch.0.anonymousId").String() - + eventType := gjson.GetBytes(job.EventPayload, "batch.0.type").String() if userID == "" && anonymousID == "" { u.log.Warn("both userID and anonymousID not found in job event payload", obskit.WorkspaceID(job.WorkspaceId), logger.NewIntField("jobId", job.JobID)) @@ -143,6 +145,23 @@ func (u *UniqueUsersReporter) GenerateReportsFromJobs(jobs []*jobsdb.JobT, sourc combinedUserIDAnonymousID := combineUserIDAnonymousID(userID, anonymousID) workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID] = u.recordIdentifier(workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID], combinedUserIDAnonymousID, idTypeIdentifiedAnonymousID) } + + // for alias event we will be adding previousId to identifiedAnonymousID hll, + // so for calculating unique users we do not double count the user + // e.g. we receive events + // {type:track, anonymousID: anon1} + // {type:track, userID: user1} + // {type:track, userID: user2} + // {type:identify, userID: user1, anonymousID: anon1} + // {type:alias, previousId: user2, userID: user1} + // userHLL: {user1, user2}, anonHLL: {anon1}, identifiedAnonHLL: {user1-anon1, user2} + // cardinality: len(userHLL)+len(anonHLL)-len(identifiedAnonHLL): 2+1-2 = 1 + if eventType == eventTypeAlias { + previousID := gjson.GetBytes(job.EventPayload, "batch.0.previousId").String() + if previousID != "" && previousID != userID && previousID != anonymousID { + workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID] = u.recordIdentifier(workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID], previousID, idTypeIdentifiedAnonymousID) + } + } } if len(workspaceSourceUserIdTypeMap) == 0 { diff --git a/enterprise/trackedusers/users_reporter_test.go b/enterprise/trackedusers/users_reporter_test.go index 7b76e4a17e..8de2fb462f 100644 --- a/enterprise/trackedusers/users_reporter_test.go +++ b/enterprise/trackedusers/users_reporter_test.go @@ -45,6 +45,16 @@ var ( WorkspaceId: workspaceID, } } + prepareAliasJob = func(sourceID, userID, previousID, workspaceID string) *jobsdb.JobT { + return &jobsdb.JobT{ + Parameters: []byte(fmt.Sprintf(`{"source_id":%q}`, sourceID)), + EventPayload: []byte(fmt.Sprintf(`{"batch": [{"previousId":%q,"userId":%q,"type":"alias"}]}`, previousID, userID)), + UserID: uuid.NewString(), + UUID: uuid.New(), + CustomVal: "GW", + WorkspaceId: workspaceID, + } + } prepareUserReport = func(t *testing.T, sourceID, workspaceID string, noOfUserIDs, noOfAnnID, noOfIdentifiedAnnID int) *UsersReport { userIDHll, _ := hll.NewHll(hllSettings) annIDHll, _ := hll.NewHll(hllSettings) @@ -195,6 +205,143 @@ func TestUniqueUsersReporter(t *testing.T) { }, }, }, + { + name: "happy case - alias jobs", + jobs: []*jobsdb.JobT{ + prepareJob(sampleSourceID, "", "anon_id", sampleWorkspaceID), + prepareJob(sampleSourceID, "user_id", "", sampleWorkspaceID), + prepareJob(sampleSourceID, "user_id", "anon_id", sampleWorkspaceID), + prepareJob(sampleSourceID, "user_id_1", "anon_id_1", sampleWorkspaceID), + prepareJob(sampleSourceID, "user", "ann", sampleWorkspaceID), + prepareJob(sampleSourceID, "user", "ann", sampleWorkspaceID2), + prepareAliasJob(sampleSourceID, "user_id", "user_id_1", sampleWorkspaceID), + }, + trackedUsers: []*UsersReport{ + { + WorkspaceID: sampleWorkspaceID, + SourceID: sampleSourceID, + UserIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id"), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user"), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id_1"), murmurSeed)) + return &resHll + }(), + AnonymousIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("ann"), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id"), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id_1"), murmurSeed)) + return &resHll + }(), + IdentifiedAnonymousIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte( + combineUserIDAnonymousID("user_id", "anon_id")), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte( + combineUserIDAnonymousID("user_id_1", "anon_id_1")), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte( + combineUserIDAnonymousID("user", "ann")), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id_1"), murmurSeed)) + return &resHll + }(), + }, + { + WorkspaceID: sampleWorkspaceID2, + SourceID: sampleSourceID, + UserIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user"), murmurSeed)) + return &resHll + }(), + AnonymousIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("ann"), murmurSeed)) + return &resHll + }(), + IdentifiedAnonymousIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte( + combineUserIDAnonymousID("user", "ann")), murmurSeed)) + return &resHll + }(), + }, + }, + }, + { + name: "alias jobs with prevID same as userID", + jobs: []*jobsdb.JobT{ + prepareJob(sampleSourceID, "", "anon_id", sampleWorkspaceID), + prepareJob(sampleSourceID, "user_id", "", sampleWorkspaceID), + prepareJob(sampleSourceID, "user_id", "anon_id", sampleWorkspaceID), + prepareJob(sampleSourceID, "user_id_1", "anon_id_1", sampleWorkspaceID), + prepareJob(sampleSourceID, "user", "ann", sampleWorkspaceID), + prepareJob(sampleSourceID, "user", "ann", sampleWorkspaceID2), + prepareAliasJob(sampleSourceID, "user_id", "user_id", sampleWorkspaceID), + }, + trackedUsers: []*UsersReport{ + { + WorkspaceID: sampleWorkspaceID, + SourceID: sampleSourceID, + UserIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id"), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user"), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id_1"), murmurSeed)) + return &resHll + }(), + AnonymousIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("ann"), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id"), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id_1"), murmurSeed)) + return &resHll + }(), + IdentifiedAnonymousIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte( + combineUserIDAnonymousID("user_id", "anon_id")), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte( + combineUserIDAnonymousID("user_id_1", "anon_id_1")), murmurSeed)) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte( + combineUserIDAnonymousID("user", "ann")), murmurSeed)) + return &resHll + }(), + }, + { + WorkspaceID: sampleWorkspaceID2, + SourceID: sampleSourceID, + UserIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user"), murmurSeed)) + return &resHll + }(), + AnonymousIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte("ann"), murmurSeed)) + return &resHll + }(), + IdentifiedAnonymousIDHll: func() *hll.Hll { + resHll, err := hll.NewHll(hllSettings) + require.NoError(t, err) + resHll.AddRaw(murmur3.Sum64WithSeed([]byte( + combineUserIDAnonymousID("user", "ann")), murmurSeed)) + return &resHll + }(), + }, + }, + }, { name: "happy case - no identified use ids", jobs: []*jobsdb.JobT{