From 99e128d6e7d4951140aabdb8badde1e7da7c2de0 Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Tue, 22 Oct 2024 11:56:24 +0530 Subject: [PATCH] chore: fix fmt --- processor/processor.go | 206 +++++++++++++++++++-------- services/dedup/scylla/scylla_test.go | 7 +- 2 files changed, 148 insertions(+), 65 deletions(-) diff --git a/processor/processor.go b/processor/processor.go index 8b5bc5e30b..f73edb1d60 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -1595,6 +1595,33 @@ func (proc *Handle) eventAuditEnabled(workspaceID string) bool { return proc.config.eventAuditEnabled[workspaceID] } +type preTransformationMessage struct { + partition string + subJobs subJob + eventSchemaJobs []*jobsdb.JobT + archivalJobs []*jobsdb.JobT + connectionDetailsMap map[string]*types.ConnectionDetails + statusDetailsMap map[string]map[string]*types.StatusDetail + reportMetrics []*types.PUReportedMetric + destFilterStatusDetailMap map[string]map[string]*types.StatusDetail + inCountMetadataMap map[string]MetricMetadata + inCountMap map[string]int64 + outCountMap map[string]int64 + totalEvents int + marshalStart time.Time + groupedEventsBySourceId map[SourceIDT][]transformer.TransformerEvent + eventsByMessageID map[string]types.SingularEventWithReceivedAt + procErrorJobs []*jobsdb.JobT + jobIDToSpecificDestMapOnly map[int64]string + groupedEvents map[string][]transformer.TransformerEvent + uniqueMessageIdsBySrcDestKey map[string]map[string]struct{} + statusList []*jobsdb.JobStatusT + jobList []*jobsdb.JobT + start time.Time + sourceDupStats map[dupStatKey]int + dedupKeys map[string]struct{} +} + func (proc *Handle) processJobsForDestV2(partition string, subJobs subJob) (*transformationMessage, error) { if proc.limiter.preprocess != nil { defer proc.limiter.preprocess.BeginWithPriority(partition, proc.getLimiterPriority(partition))() @@ -1745,9 +1772,14 @@ func (proc *Handle) processJobsForDestV2(partition string, subJobs subJob) (*tra } } - keyMap, sizeMap, err := proc.dedup.GetBatch(dedupKeysWithWorkspaceID) - if err != nil { - return nil, err + var keyMap map[dedupTypes.KeyValue]bool + var sizeMap map[dedupTypes.KeyValue]int64 + var err error + if proc.config.enableDedup { + keyMap, sizeMap, err = proc.dedup.GetBatch(dedupKeysWithWorkspaceID) + if err != nil { + return nil, err + } } for _, event := range jobsWithMetaData { sourceId := event.eventParams.SourceId @@ -1916,14 +1948,44 @@ func (proc *Handle) processJobsForDestV2(partition string, subJobs subJob) (*tra } } - return proc.generateTransformationMessage(partition, subJobs, eventSchemaJobs, archivalJobs, connectionDetailsMap, statusDetailsMap, reportMetrics, destFilterStatusDetailMap, inCountMetadataMap, inCountMap, outCountMap, totalEvents, marshalStart, groupedEventsBySourceId, eventsByMessageID, procErrorJobs, jobIDToSpecificDestMapOnly, groupedEvents, uniqueMessageIdsBySrcDestKey, statusList, jobList, start, sourceDupStats, dedupKeys) + if len(statusList) != len(jobList) { + panic(fmt.Errorf("len(statusList):%d != len(jobList):%d", len(statusList), len(jobList))) + } + + return proc.generateTransformationMessage( + &preTransformationMessage{ + partition: partition, + subJobs: subJobs, + eventSchemaJobs: eventSchemaJobs, + archivalJobs: archivalJobs, + connectionDetailsMap: connectionDetailsMap, + statusDetailsMap: statusDetailsMap, + reportMetrics: reportMetrics, + destFilterStatusDetailMap: destFilterStatusDetailMap, + inCountMetadataMap: inCountMetadataMap, + inCountMap: inCountMap, + outCountMap: outCountMap, + totalEvents: totalEvents, + marshalStart: marshalStart, + groupedEventsBySourceId: groupedEventsBySourceId, + eventsByMessageID: eventsByMessageID, + procErrorJobs: procErrorJobs, + jobIDToSpecificDestMapOnly: jobIDToSpecificDestMapOnly, + groupedEvents: groupedEvents, + uniqueMessageIdsBySrcDestKey: uniqueMessageIdsBySrcDestKey, + statusList: statusList, + jobList: jobList, + start: start, + sourceDupStats: sourceDupStats, + dedupKeys: dedupKeys, + }) } -func (proc *Handle) generateTransformationMessage(partition string, subJobs subJob, eventSchemaJobs, archivalJobs []*jobsdb.JobT, connectionDetailsMap map[string]*types.ConnectionDetails, statusDetailsMap map[string]map[string]*types.StatusDetail, reportMetrics []*types.PUReportedMetric, destFilterStatusDetailMap map[string]map[string]*types.StatusDetail, inCountMetadataMap map[string]MetricMetadata, inCountMap, outCountMap map[string]int64, totalEvents int, marshalStart time.Time, groupedEventsBySourceId map[SourceIDT][]transformer.TransformerEvent, eventsByMessageID map[string]types.SingularEventWithReceivedAt, procErrorJobs []*jobsdb.JobT, jobIDToSpecificDestMapOnly map[int64]string, groupedEvents map[string][]transformer.TransformerEvent, uniqueMessageIdsBySrcDestKey map[string]map[string]struct{}, statusList []*jobsdb.JobStatusT, jobList []*jobsdb.JobT, start time.Time, sourceDupStats map[dupStatKey]int, dedupKeys map[string]struct{}) (*transformationMessage, error) { +func (proc *Handle) generateTransformationMessage(preTrans *preTransformationMessage) (*transformationMessage, error) { g, groupCtx := errgroup.WithContext(context.Background()) g.Go(func() error { - if len(eventSchemaJobs) == 0 { + if len(preTrans.eventSchemaJobs) == 0 { return nil } err := misc.RetryWithNotify( @@ -1934,19 +1996,19 @@ func (proc *Handle) generateTransformationMessage(partition string, subJobs subJ return proc.eventSchemaDB.WithStoreSafeTx( ctx, func(tx jobsdb.StoreSafeTx) error { - return proc.eventSchemaDB.StoreInTx(ctx, tx, eventSchemaJobs) + return proc.eventSchemaDB.StoreInTx(ctx, tx, preTrans.eventSchemaJobs) }, ) }, proc.sendRetryStoreStats) if err != nil { return fmt.Errorf("store into event schema table failed with error: %v", err) } - proc.logger.Debug("[Processor] Total jobs written to event_schema: ", len(eventSchemaJobs)) + proc.logger.Debug("[Processor] Total jobs written to event_schema: ", len(preTrans.eventSchemaJobs)) return nil }) g.Go(func() error { - if len(archivalJobs) == 0 { + if len(preTrans.archivalJobs) == 0 { return nil } err := misc.RetryWithNotify( @@ -1957,14 +2019,14 @@ func (proc *Handle) generateTransformationMessage(partition string, subJobs subJ return proc.archivalDB.WithStoreSafeTx( ctx, func(tx jobsdb.StoreSafeTx) error { - return proc.archivalDB.StoreInTx(ctx, tx, archivalJobs) + return proc.archivalDB.StoreInTx(ctx, tx, preTrans.archivalJobs) }, ) }, proc.sendRetryStoreStats) if err != nil { return fmt.Errorf("store into archival table failed with error: %v", err) } - proc.logger.Debug("[Processor] Total jobs written to archiver: ", len(archivalJobs)) + proc.logger.Debug("[Processor] Total jobs written to archiver: ", len(preTrans.archivalJobs)) return nil }) @@ -1974,24 +2036,24 @@ func (proc *Handle) generateTransformationMessage(partition string, subJobs subJ // REPORTING - GATEWAY metrics - START if proc.isReportingEnabled() { - types.AssertSameKeys(connectionDetailsMap, statusDetailsMap) - for k, cd := range connectionDetailsMap { - for _, sd := range statusDetailsMap[k] { + types.AssertSameKeys(preTrans.connectionDetailsMap, preTrans.statusDetailsMap) + for k, cd := range preTrans.connectionDetailsMap { + for _, sd := range preTrans.statusDetailsMap[k] { m := &types.PUReportedMetric{ ConnectionDetails: *cd, PUDetails: *types.CreatePUDetails("", types.GATEWAY, false, true), StatusDetail: sd, } - reportMetrics = append(reportMetrics, m) + preTrans.reportMetrics = append(preTrans.reportMetrics, m) } - for _, dsd := range destFilterStatusDetailMap[k] { + for _, dsd := range preTrans.destFilterStatusDetailMap[k] { destFilterMetric := &types.PUReportedMetric{ ConnectionDetails: *cd, PUDetails: *types.CreatePUDetails(types.GATEWAY, types.DESTINATION_FILTER, false, false), StatusDetail: dsd, } - reportMetrics = append(reportMetrics, destFilterMetric) + preTrans.reportMetrics = append(preTrans.reportMetrics, destFilterMetric) } } // empty failedCountMap because no failures, @@ -1999,23 +2061,23 @@ func (proc *Handle) generateTransformationMessage(partition string, subJobs subJ diffMetrics := getDiffMetrics( types.GATEWAY, types.DESTINATION_FILTER, - inCountMetadataMap, - inCountMap, - outCountMap, + preTrans.inCountMetadataMap, + preTrans.inCountMap, + preTrans.outCountMap, map[string]int64{}, map[string]int64{}, proc.statsFactory, ) - reportMetrics = append(reportMetrics, diffMetrics...) + preTrans.reportMetrics = append(preTrans.reportMetrics, diffMetrics...) } // REPORTING - GATEWAY metrics - END - proc.stats.statNumEvents(partition).Count(totalEvents) + proc.stats.statNumEvents(preTrans.partition).Count(preTrans.totalEvents) - marshalTime := time.Since(marshalStart) - defer proc.stats.marshalSingularEvents(partition).SendTiming(marshalTime) + marshalTime := time.Since(preTrans.marshalStart) + defer proc.stats.marshalSingularEvents(preTrans.partition).SendTiming(marshalTime) - for sourceID, events := range groupedEventsBySourceId { + for sourceID, events := range preTrans.groupedEventsBySourceId { source, err := proc.getSourceBySourceID(string(sourceID)) if err != nil { continue @@ -2030,15 +2092,15 @@ func (proc *Handle) generateTransformationMessage(partition string, subJobs subJ // Placing the trackingPlan validation filters here. // Else further down events are duplicated by destId, so multiple validation takes places for same event validateEventsStart := time.Now() - validatedEventsBySourceId, validatedReportMetrics, validatedErrorJobs, trackingPlanEnabledMap := proc.validateEvents(groupedEventsBySourceId, eventsByMessageID) + validatedEventsBySourceId, validatedReportMetrics, validatedErrorJobs, trackingPlanEnabledMap := proc.validateEvents(preTrans.groupedEventsBySourceId, preTrans.eventsByMessageID) validateEventsTime := time.Since(validateEventsStart) - defer proc.stats.validateEventsTime(partition).SendTiming(validateEventsTime) + defer proc.stats.validateEventsTime(preTrans.partition).SendTiming(validateEventsTime) // Appending validatedErrorJobs to procErrorJobs - procErrorJobs = append(procErrorJobs, validatedErrorJobs...) + preTrans.procErrorJobs = append(preTrans.procErrorJobs, validatedErrorJobs...) // Appending validatedReportMetrics to reportMetrics - reportMetrics = append(reportMetrics, validatedReportMetrics...) + preTrans.reportMetrics = append(preTrans.reportMetrics, validatedReportMetrics...) // TRACKING PLAN - END // The below part further segregates events by sourceID and DestinationID. @@ -2059,7 +2121,7 @@ func (proc *Handle) generateTransformationMessage(partition string, subJobs subJ enabledDestinationsList := proc.getConsentFilteredDestinations( singularEvent, lo.Filter(proc.getEnabledDestinations(sourceId, *destType), func(item backendconfig.DestinationT, index int) bool { - destId := jobIDToSpecificDestMapOnly[event.Metadata.JobID] + destId := preTrans.jobIDToSpecificDestMapOnly[event.Metadata.JobID] if destId != "" { return destId == item.ID } @@ -2094,49 +2156,45 @@ func (proc *Handle) generateTransformationMessage(partition string, subJobs subJ metadata := shallowEventCopy.Metadata srcAndDestKey := getKeyFromSourceAndDest(metadata.SourceID, metadata.DestinationID) // We have at-least one event so marking it good - _, ok := groupedEvents[srcAndDestKey] + _, ok := preTrans.groupedEvents[srcAndDestKey] if !ok { - groupedEvents[srcAndDestKey] = make([]transformer.TransformerEvent, 0) + preTrans.groupedEvents[srcAndDestKey] = make([]transformer.TransformerEvent, 0) } - groupedEvents[srcAndDestKey] = append(groupedEvents[srcAndDestKey], + preTrans.groupedEvents[srcAndDestKey] = append(preTrans.groupedEvents[srcAndDestKey], shallowEventCopy) - if _, ok := uniqueMessageIdsBySrcDestKey[srcAndDestKey]; !ok { - uniqueMessageIdsBySrcDestKey[srcAndDestKey] = make(map[string]struct{}) + if _, ok := preTrans.uniqueMessageIdsBySrcDestKey[srcAndDestKey]; !ok { + preTrans.uniqueMessageIdsBySrcDestKey[srcAndDestKey] = make(map[string]struct{}) } - uniqueMessageIdsBySrcDestKey[srcAndDestKey][metadata.MessageID] = struct{}{} + preTrans.uniqueMessageIdsBySrcDestKey[srcAndDestKey][metadata.MessageID] = struct{}{} } } } } - - if len(statusList) != len(jobList) { - panic(fmt.Errorf("len(statusList):%d != len(jobList):%d", len(statusList), len(jobList))) - } trackedUsersReportGenStart := time.Now() - trackedUsersReports := proc.trackedUsersReporter.GenerateReportsFromJobs(jobList, proc.getNonEventStreamSources()) - proc.stats.trackedUsersReportGeneration(partition).SendTiming(time.Since(trackedUsersReportGenStart)) + trackedUsersReports := proc.trackedUsersReporter.GenerateReportsFromJobs(preTrans.jobList, proc.getNonEventStreamSources()) + proc.stats.trackedUsersReportGeneration(preTrans.partition).SendTiming(time.Since(trackedUsersReportGenStart)) - processTime := time.Since(start) - proc.stats.processJobsTime(partition).SendTiming(processTime) - processJobThroughput := throughputPerSecond(totalEvents, processTime) + processTime := time.Since(preTrans.start) + proc.stats.processJobsTime(preTrans.partition).SendTiming(processTime) + processJobThroughput := throughputPerSecond(preTrans.totalEvents, processTime) // processJob throughput per second. - proc.stats.processJobThroughput(partition).Count(processJobThroughput) + proc.stats.processJobThroughput(preTrans.partition).Count(processJobThroughput) return &transformationMessage{ - groupedEvents, + preTrans.groupedEvents, trackingPlanEnabledMap, - eventsByMessageID, - uniqueMessageIdsBySrcDestKey, - reportMetrics, - statusList, - procErrorJobs, - sourceDupStats, - dedupKeys, - - totalEvents, - start, - - subJobs.hasMore, - subJobs.rsourcesStats, + preTrans.eventsByMessageID, + preTrans.uniqueMessageIdsBySrcDestKey, + preTrans.reportMetrics, + preTrans.statusList, + preTrans.procErrorJobs, + preTrans.sourceDupStats, + preTrans.dedupKeys, + + preTrans.totalEvents, + preTrans.start, + + preTrans.subJobs.hasMore, + preTrans.subJobs.rsourcesStats, trackedUsersReports, }, nil } @@ -2420,7 +2478,33 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) (*trans } } } - return proc.generateTransformationMessage(partition, subJobs, eventSchemaJobs, archivalJobs, connectionDetailsMap, statusDetailsMap, reportMetrics, destFilterStatusDetailMap, inCountMetadataMap, inCountMap, outCountMap, totalEvents, marshalStart, groupedEventsBySourceId, eventsByMessageID, procErrorJobs, jobIDToSpecificDestMapOnly, groupedEvents, uniqueMessageIdsBySrcDestKey, statusList, jobList, start, sourceDupStats, dedupKeys) + return proc.generateTransformationMessage( + &preTransformationMessage{ + partition: partition, + subJobs: subJobs, + eventSchemaJobs: eventSchemaJobs, + archivalJobs: archivalJobs, + connectionDetailsMap: connectionDetailsMap, + statusDetailsMap: statusDetailsMap, + reportMetrics: reportMetrics, + destFilterStatusDetailMap: destFilterStatusDetailMap, + inCountMetadataMap: inCountMetadataMap, + inCountMap: inCountMap, + outCountMap: outCountMap, + totalEvents: totalEvents, + marshalStart: marshalStart, + groupedEventsBySourceId: groupedEventsBySourceId, + eventsByMessageID: eventsByMessageID, + procErrorJobs: procErrorJobs, + jobIDToSpecificDestMapOnly: jobIDToSpecificDestMapOnly, + groupedEvents: groupedEvents, + uniqueMessageIdsBySrcDestKey: uniqueMessageIdsBySrcDestKey, + statusList: statusList, + jobList: jobList, + start: start, + sourceDupStats: sourceDupStats, + dedupKeys: dedupKeys, + }) } type transformationMessage struct { diff --git a/services/dedup/scylla/scylla_test.go b/services/dedup/scylla/scylla_test.go index 08dab42b99..98e96580fa 100644 --- a/services/dedup/scylla/scylla_test.go +++ b/services/dedup/scylla/scylla_test.go @@ -1,10 +1,10 @@ package scylla import ( - "github.com/google/uuid" "strings" "testing" + "github.com/google/uuid" "github.com/ory/dockertest/v3" "github.com/stretchr/testify/require" @@ -126,9 +126,8 @@ func Test_Scylla(t *testing.T) { }) } -//Benchmark_ScyllaGet/Get-12 20 71365577 ns/op -//Benchmark_ScyllaGet/GetBatch-12 266 4006687 ns/op - +// Benchmark_ScyllaGet/Get-12 20 71365577 ns/op +// Benchmark_ScyllaGet/GetBatch-12 266 4006687 ns/op func Benchmark_ScyllaGet(b *testing.B) { pool, err := dockertest.NewPool("") require.NoError(b, err)