Skip to content

Commit

Permalink
feat: added flags for event audit
Browse files Browse the repository at this point in the history
  • Loading branch information
kanishkkatara committed Sep 13, 2023
1 parent 594828e commit 38221c0
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 5 deletions.
3 changes: 2 additions & 1 deletion backend-config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ func (c *ConfigT) DestinationsMap() map[string]*DestinationT {
}

type Settings struct {
DataRetention DataRetention `json:"dataRetention"`
DataRetention DataRetention `json:"dataRetention"`
EventAuditEnabled bool `json:"eventAuditEnabled"`
}

type DataRetention struct {
Expand Down
9 changes: 5 additions & 4 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ type Handle struct {
asyncInit *misc.AsyncInit
eventSchemaV2Enabled bool
archivalEnabled bool
eventSchemaV2AllSources bool
eventAuditEnabled bool
}

adaptiveLimit func(int64) int64
Expand Down Expand Up @@ -598,7 +598,6 @@ func (proc *Handle) loadConfig() {
// EventSchemas2 feature.
config.RegisterBoolConfigVariable(false, &proc.config.eventSchemaV2Enabled, false, "EventSchemas2.enabled")
config.RegisterBoolConfigVariable(true, &proc.config.archivalEnabled, true, "archival.Enabled")
config.RegisterBoolConfigVariable(false, &proc.config.eventSchemaV2AllSources, false, "EventSchemas2.enableAllSources")
proc.config.batchDestinations = misc.BatchDestinations()
config.RegisterIntConfigVariable(5, &proc.config.transformTimesPQLength, false, 1, "Processor.transformTimesPQLength")
// Capture event name as a tag in event level stats
Expand Down Expand Up @@ -716,6 +715,7 @@ func (proc *Handle) backendConfigSubscriber(ctx context.Context) {
sourceIdDestinationMap = make(map[string][]backendconfig.DestinationT)
sourceIdSourceMap = map[string]backendconfig.SourceT{}
destinationIDtoTypeMap = make(map[string]string)
eventAuditEnabled = false
)
for workspaceID, wConfig := range config {
for i := range wConfig.Sources {
Expand All @@ -731,13 +731,15 @@ func (proc *Handle) backendConfigSubscriber(ctx context.Context) {
}
}
workspaceLibrariesMap[workspaceID] = wConfig.Libraries
eventAuditEnabled = wConfig.Settings.EventAuditEnabled
}
proc.config.configSubscriberLock.Lock()
proc.config.destConsentCategories = destConsentCategories
proc.config.workspaceLibrariesMap = workspaceLibrariesMap
proc.config.sourceIdDestinationMap = sourceIdDestinationMap
proc.config.sourceIdSourceMap = sourceIdSourceMap
proc.config.destinationIDtoTypeMap = destinationIDtoTypeMap
proc.config.eventAuditEnabled = eventAuditEnabled
proc.config.configSubscriberLock.Unlock()
if !initDone {
initDone = true
Expand Down Expand Up @@ -1481,8 +1483,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
},
)
if proc.config.eventSchemaV2Enabled && // schemas enabled
// source has schemas enabled or if we override schemas for all sources
(source.EventSchemasEnabled || proc.config.eventSchemaV2AllSources) &&
proc.config.eventAuditEnabled &&
// TODO: could use source.SourceDefinition.Category instead?
commonMetadataFromSingularEvent.SourceJobRunID == "" {
if payload := payloadFunc(); payload != nil {
Expand Down
1 change: 1 addition & 0 deletions schema-forwarder/internal/testdata/configdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,5 +272,6 @@ var SampleBackendConfig = backendconfig.ConfigT{
DataRetention: backendconfig.DataRetention{
DisableReportingPII: true,
},
EventAuditEnabled: false,
},
}

0 comments on commit 38221c0

Please sign in to comment.