diff --git a/archiver/archiver_isolation_test.go b/archiver/archiver_isolation_test.go index 02f5ecb7a1..a8a013fc43 100644 --- a/archiver/archiver_isolation_test.go +++ b/archiver/archiver_isolation_test.go @@ -98,11 +98,10 @@ func dummyConfig( ID: "someSourceDefinitionID", Category: "someCategory", }, - WriteKey: trand.String(10), - Transient: false, - EventSchemasEnabled: false, - Enabled: true, - WorkspaceID: workspaceID, + WriteKey: trand.String(10), + Transient: false, + Enabled: true, + WorkspaceID: workspaceID, }) } configMap[workspaceID] = wConfig diff --git a/backend-config/backend-config.go b/backend-config/backend-config.go index a48ee211f6..5fdbea9416 100644 --- a/backend-config/backend-config.go +++ b/backend-config/backend-config.go @@ -153,6 +153,7 @@ func filterProcessorEnabledDestinations(config ConfigT) ConfigT { source.Destinations = destinations modifiedConfig.Sources = append(modifiedConfig.Sources, source) } + modifiedConfig.Settings = config.Settings return modifiedConfig } diff --git a/backend-config/replay_types.go b/backend-config/replay_types.go index 6c34c92777..1d8c804edc 100644 --- a/backend-config/replay_types.go +++ b/backend-config/replay_types.go @@ -25,7 +25,6 @@ func (config *ConfigT) ApplyReplaySources() { newSource.ID = id newSource.OriginalID = s.ID newSource.WriteKey = id - newSource.EventSchemasEnabled = false newSource.Config = lo.OmitByKeys(newSource.Config, []string{"eventUpload"}) // no event uploads for replay sources for now newSource.Destinations = nil // destinations are added later return &newSource diff --git a/backend-config/types.go b/backend-config/types.go index 1f37c8f92c..13e817c91f 100644 --- a/backend-config/types.go +++ b/backend-config/types.go @@ -69,7 +69,6 @@ type SourceT struct { WriteKey string DgSourceTrackingPlanConfig DgSourceTrackingPlanConfigT Transient bool - EventSchemasEnabled bool } func (s *SourceT) IsReplaySource() bool { @@ -124,7 +123,8 @@ func (c *ConfigT) DestinationsMap() map[string]*DestinationT { } type Settings struct { - DataRetention DataRetention `json:"dataRetention"` + DataRetention DataRetention `json:"dataRetention"` + EventAuditEnabled bool `json:"eventAuditEnabled"` } type DataRetention struct { diff --git a/processor/processor.go b/processor/processor.go index d61f24bc06..a1f9ce8af2 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -139,7 +139,7 @@ type Handle struct { asyncInit *misc.AsyncInit eventSchemaV2Enabled bool archivalEnabled misc.ValueLoader[bool] - eventSchemaV2AllSources bool + eventAuditEnabled map[string]bool } adaptiveLimit func(int64) int64 @@ -589,7 +589,6 @@ func (proc *Handle) loadConfig() { // EventSchemas feature. false by default proc.config.enableEventSchemasFeature = config.GetBoolVar(false, "EventSchemas.enableEventSchemasFeature") proc.config.eventSchemaV2Enabled = config.GetBoolVar(false, "EventSchemas2.enabled") - proc.config.eventSchemaV2AllSources = config.GetBoolVar(false, "EventSchemas2.enableAllSources") proc.config.batchDestinations = misc.BatchDestinations() proc.config.transformTimesPQLength = config.GetIntVar(5, 1, "Processor.transformTimesPQLength") proc.config.transformerURL = config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090") @@ -723,6 +722,7 @@ func (proc *Handle) backendConfigSubscriber(ctx context.Context) { sourceIdDestinationMap = make(map[string][]backendconfig.DestinationT) sourceIdSourceMap = map[string]backendconfig.SourceT{} destinationIDtoTypeMap = make(map[string]string) + eventAuditEnabled = make(map[string]bool) ) for workspaceID, wConfig := range config { for i := range wConfig.Sources { @@ -738,6 +738,7 @@ func (proc *Handle) backendConfigSubscriber(ctx context.Context) { } } workspaceLibrariesMap[workspaceID] = wConfig.Libraries + eventAuditEnabled[workspaceID] = wConfig.Settings.EventAuditEnabled } proc.config.configSubscriberLock.Lock() proc.config.destConsentCategories = destConsentCategories @@ -745,6 +746,7 @@ func (proc *Handle) backendConfigSubscriber(ctx context.Context) { proc.config.sourceIdDestinationMap = sourceIdDestinationMap proc.config.sourceIdSourceMap = sourceIdSourceMap proc.config.destinationIDtoTypeMap = destinationIDtoTypeMap + proc.config.eventAuditEnabled = eventAuditEnabled proc.config.configSubscriberLock.Unlock() if !initDone { initDone = true @@ -1377,6 +1379,12 @@ type dupStatKey struct { equalSize bool } +func (proc *Handle) eventAuditEnabled(workspaceID string) bool { + proc.config.configSubscriberLock.RLock() + defer proc.config.configSubscriberLock.RUnlock() + return proc.config.eventAuditEnabled[workspaceID] +} + func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transformationMessage { if proc.limiter.preprocess != nil { defer proc.limiter.preprocess.BeginWithPriority(partition, proc.getLimiterPriority(partition))() @@ -1485,8 +1493,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf sourceIsTransient := proc.transientSources.Apply(source.ID) if proc.config.eventSchemaV2Enabled && // schemas enabled - // source has schemas enabled or if we override schemas for all sources - (source.EventSchemasEnabled || proc.config.eventSchemaV2AllSources) && + proc.eventAuditEnabled(batchEvent.WorkspaceId) && // TODO: could use source.SourceDefinition.Category instead? commonMetadataFromSingularEvent.SourceJobRunID == "" && !sourceIsTransient { diff --git a/processor/processor_test.go b/processor/processor_test.go index e5e88e89bf..23932da261 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -216,10 +216,9 @@ var sampleBackendConfig = backendconfig.ConfigT{ }, }, { - ID: SourceIDEnabledNoUT, - WriteKey: WriteKeyEnabledNoUT, - Enabled: true, - EventSchemasEnabled: true, + ID: SourceIDEnabledNoUT, + WriteKey: WriteKeyEnabledNoUT, + Enabled: true, SourceDefinition: backendconfig.SourceDefinitionT{ Category: "eventStream", }, @@ -453,6 +452,9 @@ var sampleBackendConfig = backendconfig.ConfigT{ }, }, }, + Settings: backendconfig.Settings{ + EventAuditEnabled: true, + }, } func initProcessor() { @@ -560,6 +562,7 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() { EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, Parameters: createBatchParameters(SourceIDEnabled), + WorkspaceId: sampleWorkspaceID, }, { UUID: uuid.New(), @@ -579,6 +582,7 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() { EventCount: 2, LastJobStatus: jobsdb.JobStatusT{}, Parameters: createBatchParameters(SourceIDEnabledNoUT), + WorkspaceId: sampleWorkspaceID, }, { UUID: uuid.New(), @@ -590,6 +594,7 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() { EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, Parameters: createBatchParameters(SourceIDEnabled), + WorkspaceId: sampleWorkspaceID, }, { UUID: uuid.New(), @@ -601,6 +606,7 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() { EventCount: 1, LastJobStatus: jobsdb.JobStatusT{}, Parameters: createBatchParameters(SourceIDEnabled), + WorkspaceId: sampleWorkspaceID, }, { UUID: uuid.New(), @@ -618,8 +624,9 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() { }, createMessagePayload, ), - EventCount: 3, - Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), + EventCount: 3, + Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT), + WorkspaceId: sampleWorkspaceID, }, } mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) diff --git a/schema-forwarder/internal/testdata/configdata.go b/schema-forwarder/internal/testdata/configdata.go index 2bc66f7d00..21f747500e 100644 --- a/schema-forwarder/internal/testdata/configdata.go +++ b/schema-forwarder/internal/testdata/configdata.go @@ -272,5 +272,6 @@ var SampleBackendConfig = backendconfig.ConfigT{ DataRetention: backendconfig.DataRetention{ DisableReportingPII: true, }, + EventAuditEnabled: false, }, }