Skip to content

Commit

Permalink
chore: add handling for alias events in tracked users report
Browse files Browse the repository at this point in the history
  • Loading branch information
mihir20 committed Jul 16, 2024
1 parent 9db1ea4 commit b48aabd
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 1 deletion.
21 changes: 20 additions & 1 deletion enterprise/trackedusers/users_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const (
murmurSeed = 123

trackUsersTable = "tracked_users_reports"

eventTypeAlias = "alias"
)

type UsersReport struct {
Expand Down Expand Up @@ -120,7 +122,7 @@ func (u *UniqueUsersReporter) GenerateReportsFromJobs(jobs []*jobsdb.JobT, sourc
}
userID := gjson.GetBytes(job.EventPayload, "batch.0.userId").String()
anonymousID := gjson.GetBytes(job.EventPayload, "batch.0.anonymousId").String()

eventType := gjson.GetBytes(job.EventPayload, "batch.0.type").String()
if userID == "" && anonymousID == "" {
u.log.Warn("both userID and anonymousID not found in job event payload", obskit.WorkspaceID(job.WorkspaceId),
logger.NewIntField("jobId", job.JobID))
Expand All @@ -143,6 +145,23 @@ func (u *UniqueUsersReporter) GenerateReportsFromJobs(jobs []*jobsdb.JobT, sourc
combinedUserIDAnonymousID := combineUserIDAnonymousID(userID, anonymousID)
workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID] = u.recordIdentifier(workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID], combinedUserIDAnonymousID, idTypeIdentifiedAnonymousID)
}

// for alias event we will be adding previousId to identifiedAnonymousID hll,
// so for calculating unique users we do not double count the user
// e.g. we receive events
// {type:track, anonymousID: anon1}
// {type:track, userID: user1}
// {type:track, userID: user2}
// {type:identify, userID: user1, anonymousID: anon1}
// {type:alias, previousId: user2, userID: user1}
// userHLL: {user1, user2}, anonHLL: {anon1}, identifiedAnonHLL: {user1-anon1, user2}
// cardinality: len(userHLL)+len(anonHLL)-len(identifiedAnonHLL): 2+1-2 = 1
if eventType == eventTypeAlias {
previousID := gjson.GetBytes(job.EventPayload, "batch.0.previousId").String()
if previousID != "" {
workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID] = u.recordIdentifier(workspaceSourceUserIdTypeMap[job.WorkspaceId][sourceID], previousID, idTypeIdentifiedAnonymousID)
}
}
}

if len(workspaceSourceUserIdTypeMap) == 0 {
Expand Down
79 changes: 79 additions & 0 deletions enterprise/trackedusers/users_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ var (
WorkspaceId: workspaceID,
}
}
prepareAliasJob = func(sourceID, userID, previousID, workspaceID string) *jobsdb.JobT {
return &jobsdb.JobT{
Parameters: []byte(fmt.Sprintf(`{"source_id":%q}`, sourceID)),
EventPayload: []byte(fmt.Sprintf(`{"batch": [{"previousId":%q,"userId":%q,"type":"alias"}]}`, previousID, userID)),
UserID: uuid.NewString(),
UUID: uuid.New(),
CustomVal: "GW",
WorkspaceId: workspaceID,
}
}
prepareUserReport = func(t *testing.T, sourceID, workspaceID string, noOfUserIDs, noOfAnnID, noOfIdentifiedAnnID int) *UsersReport {
userIDHll, _ := hll.NewHll(hllSettings)
annIDHll, _ := hll.NewHll(hllSettings)
Expand Down Expand Up @@ -195,6 +205,75 @@ func TestUniqueUsersReporter(t *testing.T) {
},
},
},
{
name: "happy case - alias jobs",
jobs: []*jobsdb.JobT{
prepareJob(sampleSourceID, "", "anon_id", sampleWorkspaceID),
prepareJob(sampleSourceID, "user_id", "", sampleWorkspaceID),
prepareJob(sampleSourceID, "user_id", "anon_id", sampleWorkspaceID),
prepareJob(sampleSourceID, "user_id_1", "anon_id_1", sampleWorkspaceID),
prepareJob(sampleSourceID, "user", "ann", sampleWorkspaceID),
prepareJob(sampleSourceID, "user", "ann", sampleWorkspaceID2),
prepareAliasJob(sampleSourceID, "user_id", "user_id_1", sampleWorkspaceID),
},
trackedUsers: []*UsersReport{
{
WorkspaceID: sampleWorkspaceID,
SourceID: sampleSourceID,
UserIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id_1"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("ann"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id"), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("anon_id_1"), murmurSeed))
return &resHll
}(),
IdentifiedAnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte(
combineUserIDAnonymousID("user_id", "anon_id")), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte(
combineUserIDAnonymousID("user_id_1", "anon_id_1")), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte(
combineUserIDAnonymousID("user", "ann")), murmurSeed))
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user_id_1"), murmurSeed))
return &resHll
}(),
},
{
WorkspaceID: sampleWorkspaceID2,
SourceID: sampleSourceID,
UserIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("user"), murmurSeed))
return &resHll
}(),
AnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte("ann"), murmurSeed))
return &resHll
}(),
IdentifiedAnonymousIDHll: func() *hll.Hll {
resHll, err := hll.NewHll(hllSettings)
require.NoError(t, err)
resHll.AddRaw(murmur3.Sum64WithSeed([]byte(
combineUserIDAnonymousID("user", "ann")), murmurSeed))
return &resHll
}(),
},
},
},
{
name: "happy case - no identified use ids",
jobs: []*jobsdb.JobT{
Expand Down

0 comments on commit b48aabd

Please sign in to comment.