Skip to content

Commit

Permalink
chore: fix fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
Rohith BCS authored and Rohith BCS committed Oct 22, 2024
1 parent 5187fa2 commit 99e128d
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 65 deletions.
206 changes: 145 additions & 61 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1595,6 +1595,33 @@ func (proc *Handle) eventAuditEnabled(workspaceID string) bool {
return proc.config.eventAuditEnabled[workspaceID]
}

type preTransformationMessage struct {
partition string
subJobs subJob
eventSchemaJobs []*jobsdb.JobT
archivalJobs []*jobsdb.JobT
connectionDetailsMap map[string]*types.ConnectionDetails
statusDetailsMap map[string]map[string]*types.StatusDetail
reportMetrics []*types.PUReportedMetric
destFilterStatusDetailMap map[string]map[string]*types.StatusDetail
inCountMetadataMap map[string]MetricMetadata
inCountMap map[string]int64
outCountMap map[string]int64
totalEvents int
marshalStart time.Time
groupedEventsBySourceId map[SourceIDT][]transformer.TransformerEvent
eventsByMessageID map[string]types.SingularEventWithReceivedAt
procErrorJobs []*jobsdb.JobT
jobIDToSpecificDestMapOnly map[int64]string
groupedEvents map[string][]transformer.TransformerEvent
uniqueMessageIdsBySrcDestKey map[string]map[string]struct{}
statusList []*jobsdb.JobStatusT
jobList []*jobsdb.JobT
start time.Time
sourceDupStats map[dupStatKey]int
dedupKeys map[string]struct{}
}

func (proc *Handle) processJobsForDestV2(partition string, subJobs subJob) (*transformationMessage, error) {
if proc.limiter.preprocess != nil {
defer proc.limiter.preprocess.BeginWithPriority(partition, proc.getLimiterPriority(partition))()
Expand Down Expand Up @@ -1745,9 +1772,14 @@ func (proc *Handle) processJobsForDestV2(partition string, subJobs subJob) (*tra
}
}

keyMap, sizeMap, err := proc.dedup.GetBatch(dedupKeysWithWorkspaceID)
if err != nil {
return nil, err
var keyMap map[dedupTypes.KeyValue]bool
var sizeMap map[dedupTypes.KeyValue]int64
var err error
if proc.config.enableDedup {
keyMap, sizeMap, err = proc.dedup.GetBatch(dedupKeysWithWorkspaceID)
if err != nil {
return nil, err
}
}
for _, event := range jobsWithMetaData {
sourceId := event.eventParams.SourceId
Expand Down Expand Up @@ -1916,14 +1948,44 @@ func (proc *Handle) processJobsForDestV2(partition string, subJobs subJob) (*tra
}
}

return proc.generateTransformationMessage(partition, subJobs, eventSchemaJobs, archivalJobs, connectionDetailsMap, statusDetailsMap, reportMetrics, destFilterStatusDetailMap, inCountMetadataMap, inCountMap, outCountMap, totalEvents, marshalStart, groupedEventsBySourceId, eventsByMessageID, procErrorJobs, jobIDToSpecificDestMapOnly, groupedEvents, uniqueMessageIdsBySrcDestKey, statusList, jobList, start, sourceDupStats, dedupKeys)
if len(statusList) != len(jobList) {
panic(fmt.Errorf("len(statusList):%d != len(jobList):%d", len(statusList), len(jobList)))
}

return proc.generateTransformationMessage(
&preTransformationMessage{
partition: partition,
subJobs: subJobs,
eventSchemaJobs: eventSchemaJobs,
archivalJobs: archivalJobs,
connectionDetailsMap: connectionDetailsMap,
statusDetailsMap: statusDetailsMap,
reportMetrics: reportMetrics,
destFilterStatusDetailMap: destFilterStatusDetailMap,
inCountMetadataMap: inCountMetadataMap,
inCountMap: inCountMap,
outCountMap: outCountMap,
totalEvents: totalEvents,
marshalStart: marshalStart,
groupedEventsBySourceId: groupedEventsBySourceId,
eventsByMessageID: eventsByMessageID,
procErrorJobs: procErrorJobs,
jobIDToSpecificDestMapOnly: jobIDToSpecificDestMapOnly,
groupedEvents: groupedEvents,
uniqueMessageIdsBySrcDestKey: uniqueMessageIdsBySrcDestKey,
statusList: statusList,
jobList: jobList,
start: start,
sourceDupStats: sourceDupStats,
dedupKeys: dedupKeys,
})
}

func (proc *Handle) generateTransformationMessage(partition string, subJobs subJob, eventSchemaJobs, archivalJobs []*jobsdb.JobT, connectionDetailsMap map[string]*types.ConnectionDetails, statusDetailsMap map[string]map[string]*types.StatusDetail, reportMetrics []*types.PUReportedMetric, destFilterStatusDetailMap map[string]map[string]*types.StatusDetail, inCountMetadataMap map[string]MetricMetadata, inCountMap, outCountMap map[string]int64, totalEvents int, marshalStart time.Time, groupedEventsBySourceId map[SourceIDT][]transformer.TransformerEvent, eventsByMessageID map[string]types.SingularEventWithReceivedAt, procErrorJobs []*jobsdb.JobT, jobIDToSpecificDestMapOnly map[int64]string, groupedEvents map[string][]transformer.TransformerEvent, uniqueMessageIdsBySrcDestKey map[string]map[string]struct{}, statusList []*jobsdb.JobStatusT, jobList []*jobsdb.JobT, start time.Time, sourceDupStats map[dupStatKey]int, dedupKeys map[string]struct{}) (*transformationMessage, error) {
func (proc *Handle) generateTransformationMessage(preTrans *preTransformationMessage) (*transformationMessage, error) {
g, groupCtx := errgroup.WithContext(context.Background())

g.Go(func() error {
if len(eventSchemaJobs) == 0 {
if len(preTrans.eventSchemaJobs) == 0 {
return nil
}
err := misc.RetryWithNotify(
Expand All @@ -1934,19 +1996,19 @@ func (proc *Handle) generateTransformationMessage(partition string, subJobs subJ
return proc.eventSchemaDB.WithStoreSafeTx(
ctx,
func(tx jobsdb.StoreSafeTx) error {
return proc.eventSchemaDB.StoreInTx(ctx, tx, eventSchemaJobs)
return proc.eventSchemaDB.StoreInTx(ctx, tx, preTrans.eventSchemaJobs)
},
)
}, proc.sendRetryStoreStats)
if err != nil {
return fmt.Errorf("store into event schema table failed with error: %v", err)
}
proc.logger.Debug("[Processor] Total jobs written to event_schema: ", len(eventSchemaJobs))
proc.logger.Debug("[Processor] Total jobs written to event_schema: ", len(preTrans.eventSchemaJobs))
return nil
})

g.Go(func() error {
if len(archivalJobs) == 0 {
if len(preTrans.archivalJobs) == 0 {
return nil
}
err := misc.RetryWithNotify(
Expand All @@ -1957,14 +2019,14 @@ func (proc *Handle) generateTransformationMessage(partition string, subJobs subJ
return proc.archivalDB.WithStoreSafeTx(
ctx,
func(tx jobsdb.StoreSafeTx) error {
return proc.archivalDB.StoreInTx(ctx, tx, archivalJobs)
return proc.archivalDB.StoreInTx(ctx, tx, preTrans.archivalJobs)
},
)
}, proc.sendRetryStoreStats)
if err != nil {
return fmt.Errorf("store into archival table failed with error: %v", err)
}
proc.logger.Debug("[Processor] Total jobs written to archiver: ", len(archivalJobs))
proc.logger.Debug("[Processor] Total jobs written to archiver: ", len(preTrans.archivalJobs))
return nil
})

Expand All @@ -1974,48 +2036,48 @@ func (proc *Handle) generateTransformationMessage(partition string, subJobs subJ

// REPORTING - GATEWAY metrics - START
if proc.isReportingEnabled() {
types.AssertSameKeys(connectionDetailsMap, statusDetailsMap)
for k, cd := range connectionDetailsMap {
for _, sd := range statusDetailsMap[k] {
types.AssertSameKeys(preTrans.connectionDetailsMap, preTrans.statusDetailsMap)
for k, cd := range preTrans.connectionDetailsMap {
for _, sd := range preTrans.statusDetailsMap[k] {
m := &types.PUReportedMetric{
ConnectionDetails: *cd,
PUDetails: *types.CreatePUDetails("", types.GATEWAY, false, true),
StatusDetail: sd,
}
reportMetrics = append(reportMetrics, m)
preTrans.reportMetrics = append(preTrans.reportMetrics, m)
}

for _, dsd := range destFilterStatusDetailMap[k] {
for _, dsd := range preTrans.destFilterStatusDetailMap[k] {
destFilterMetric := &types.PUReportedMetric{
ConnectionDetails: *cd,
PUDetails: *types.CreatePUDetails(types.GATEWAY, types.DESTINATION_FILTER, false, false),
StatusDetail: dsd,
}
reportMetrics = append(reportMetrics, destFilterMetric)
preTrans.reportMetrics = append(preTrans.reportMetrics, destFilterMetric)
}
}
// empty failedCountMap because no failures,
// events are just dropped at this point if no destination is found to route the events
diffMetrics := getDiffMetrics(
types.GATEWAY,
types.DESTINATION_FILTER,
inCountMetadataMap,
inCountMap,
outCountMap,
preTrans.inCountMetadataMap,
preTrans.inCountMap,
preTrans.outCountMap,
map[string]int64{},
map[string]int64{},
proc.statsFactory,
)
reportMetrics = append(reportMetrics, diffMetrics...)
preTrans.reportMetrics = append(preTrans.reportMetrics, diffMetrics...)
}
// REPORTING - GATEWAY metrics - END

proc.stats.statNumEvents(partition).Count(totalEvents)
proc.stats.statNumEvents(preTrans.partition).Count(preTrans.totalEvents)

marshalTime := time.Since(marshalStart)
defer proc.stats.marshalSingularEvents(partition).SendTiming(marshalTime)
marshalTime := time.Since(preTrans.marshalStart)
defer proc.stats.marshalSingularEvents(preTrans.partition).SendTiming(marshalTime)

for sourceID, events := range groupedEventsBySourceId {
for sourceID, events := range preTrans.groupedEventsBySourceId {
source, err := proc.getSourceBySourceID(string(sourceID))
if err != nil {
continue
Expand All @@ -2030,15 +2092,15 @@ func (proc *Handle) generateTransformationMessage(partition string, subJobs subJ
// Placing the trackingPlan validation filters here.
// Else further down events are duplicated by destId, so multiple validation takes places for same event
validateEventsStart := time.Now()
validatedEventsBySourceId, validatedReportMetrics, validatedErrorJobs, trackingPlanEnabledMap := proc.validateEvents(groupedEventsBySourceId, eventsByMessageID)
validatedEventsBySourceId, validatedReportMetrics, validatedErrorJobs, trackingPlanEnabledMap := proc.validateEvents(preTrans.groupedEventsBySourceId, preTrans.eventsByMessageID)
validateEventsTime := time.Since(validateEventsStart)
defer proc.stats.validateEventsTime(partition).SendTiming(validateEventsTime)
defer proc.stats.validateEventsTime(preTrans.partition).SendTiming(validateEventsTime)

// Appending validatedErrorJobs to procErrorJobs
procErrorJobs = append(procErrorJobs, validatedErrorJobs...)
preTrans.procErrorJobs = append(preTrans.procErrorJobs, validatedErrorJobs...)

// Appending validatedReportMetrics to reportMetrics
reportMetrics = append(reportMetrics, validatedReportMetrics...)
preTrans.reportMetrics = append(preTrans.reportMetrics, validatedReportMetrics...)
// TRACKING PLAN - END

// The below part further segregates events by sourceID and DestinationID.
Expand All @@ -2059,7 +2121,7 @@ func (proc *Handle) generateTransformationMessage(partition string, subJobs subJ
enabledDestinationsList := proc.getConsentFilteredDestinations(
singularEvent,
lo.Filter(proc.getEnabledDestinations(sourceId, *destType), func(item backendconfig.DestinationT, index int) bool {
destId := jobIDToSpecificDestMapOnly[event.Metadata.JobID]
destId := preTrans.jobIDToSpecificDestMapOnly[event.Metadata.JobID]
if destId != "" {
return destId == item.ID
}
Expand Down Expand Up @@ -2094,49 +2156,45 @@ func (proc *Handle) generateTransformationMessage(partition string, subJobs subJ
metadata := shallowEventCopy.Metadata
srcAndDestKey := getKeyFromSourceAndDest(metadata.SourceID, metadata.DestinationID)
// We have at-least one event so marking it good
_, ok := groupedEvents[srcAndDestKey]
_, ok := preTrans.groupedEvents[srcAndDestKey]
if !ok {
groupedEvents[srcAndDestKey] = make([]transformer.TransformerEvent, 0)
preTrans.groupedEvents[srcAndDestKey] = make([]transformer.TransformerEvent, 0)
}
groupedEvents[srcAndDestKey] = append(groupedEvents[srcAndDestKey],
preTrans.groupedEvents[srcAndDestKey] = append(preTrans.groupedEvents[srcAndDestKey],
shallowEventCopy)
if _, ok := uniqueMessageIdsBySrcDestKey[srcAndDestKey]; !ok {
uniqueMessageIdsBySrcDestKey[srcAndDestKey] = make(map[string]struct{})
if _, ok := preTrans.uniqueMessageIdsBySrcDestKey[srcAndDestKey]; !ok {
preTrans.uniqueMessageIdsBySrcDestKey[srcAndDestKey] = make(map[string]struct{})
}
uniqueMessageIdsBySrcDestKey[srcAndDestKey][metadata.MessageID] = struct{}{}
preTrans.uniqueMessageIdsBySrcDestKey[srcAndDestKey][metadata.MessageID] = struct{}{}
}
}
}
}

if len(statusList) != len(jobList) {
panic(fmt.Errorf("len(statusList):%d != len(jobList):%d", len(statusList), len(jobList)))
}
trackedUsersReportGenStart := time.Now()
trackedUsersReports := proc.trackedUsersReporter.GenerateReportsFromJobs(jobList, proc.getNonEventStreamSources())
proc.stats.trackedUsersReportGeneration(partition).SendTiming(time.Since(trackedUsersReportGenStart))
trackedUsersReports := proc.trackedUsersReporter.GenerateReportsFromJobs(preTrans.jobList, proc.getNonEventStreamSources())
proc.stats.trackedUsersReportGeneration(preTrans.partition).SendTiming(time.Since(trackedUsersReportGenStart))

processTime := time.Since(start)
proc.stats.processJobsTime(partition).SendTiming(processTime)
processJobThroughput := throughputPerSecond(totalEvents, processTime)
processTime := time.Since(preTrans.start)
proc.stats.processJobsTime(preTrans.partition).SendTiming(processTime)
processJobThroughput := throughputPerSecond(preTrans.totalEvents, processTime)
// processJob throughput per second.
proc.stats.processJobThroughput(partition).Count(processJobThroughput)
proc.stats.processJobThroughput(preTrans.partition).Count(processJobThroughput)
return &transformationMessage{
groupedEvents,
preTrans.groupedEvents,
trackingPlanEnabledMap,
eventsByMessageID,
uniqueMessageIdsBySrcDestKey,
reportMetrics,
statusList,
procErrorJobs,
sourceDupStats,
dedupKeys,

totalEvents,
start,

subJobs.hasMore,
subJobs.rsourcesStats,
preTrans.eventsByMessageID,
preTrans.uniqueMessageIdsBySrcDestKey,
preTrans.reportMetrics,
preTrans.statusList,
preTrans.procErrorJobs,
preTrans.sourceDupStats,
preTrans.dedupKeys,

preTrans.totalEvents,
preTrans.start,

preTrans.subJobs.hasMore,
preTrans.subJobs.rsourcesStats,
trackedUsersReports,
}, nil
}
Expand Down Expand Up @@ -2420,7 +2478,33 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) (*trans
}
}
}
return proc.generateTransformationMessage(partition, subJobs, eventSchemaJobs, archivalJobs, connectionDetailsMap, statusDetailsMap, reportMetrics, destFilterStatusDetailMap, inCountMetadataMap, inCountMap, outCountMap, totalEvents, marshalStart, groupedEventsBySourceId, eventsByMessageID, procErrorJobs, jobIDToSpecificDestMapOnly, groupedEvents, uniqueMessageIdsBySrcDestKey, statusList, jobList, start, sourceDupStats, dedupKeys)
return proc.generateTransformationMessage(
&preTransformationMessage{
partition: partition,
subJobs: subJobs,
eventSchemaJobs: eventSchemaJobs,
archivalJobs: archivalJobs,
connectionDetailsMap: connectionDetailsMap,
statusDetailsMap: statusDetailsMap,
reportMetrics: reportMetrics,
destFilterStatusDetailMap: destFilterStatusDetailMap,
inCountMetadataMap: inCountMetadataMap,
inCountMap: inCountMap,
outCountMap: outCountMap,
totalEvents: totalEvents,
marshalStart: marshalStart,
groupedEventsBySourceId: groupedEventsBySourceId,
eventsByMessageID: eventsByMessageID,
procErrorJobs: procErrorJobs,
jobIDToSpecificDestMapOnly: jobIDToSpecificDestMapOnly,
groupedEvents: groupedEvents,
uniqueMessageIdsBySrcDestKey: uniqueMessageIdsBySrcDestKey,
statusList: statusList,
jobList: jobList,
start: start,
sourceDupStats: sourceDupStats,
dedupKeys: dedupKeys,
})
}

type transformationMessage struct {
Expand Down
7 changes: 3 additions & 4 deletions services/dedup/scylla/scylla_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package scylla

import (
"github.com/google/uuid"
"strings"
"testing"

"github.com/google/uuid"
"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -126,9 +126,8 @@ func Test_Scylla(t *testing.T) {
})
}

//Benchmark_ScyllaGet/Get-12 20 71365577 ns/op
//Benchmark_ScyllaGet/GetBatch-12 266 4006687 ns/op

// Benchmark_ScyllaGet/Get-12 20 71365577 ns/op
// Benchmark_ScyllaGet/GetBatch-12 266 4006687 ns/op
func Benchmark_ScyllaGet(b *testing.B) {
pool, err := dockertest.NewPool("")
require.NoError(b, err)
Expand Down

0 comments on commit 99e128d

Please sign in to comment.