Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added flags for event audit #3859

Merged
merged 16 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend-config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ func (c *ConfigT) DestinationsMap() map[string]*DestinationT {
}

type Settings struct {
DataRetention DataRetention `json:"dataRetention"`
DataRetention DataRetention `json:"dataRetention"`
EventAuditEnabled bool `json:"eventAuditEnabled"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we verified that data is unmarshalling into this struct automatically ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

}

type DataRetention struct {
Expand Down
11 changes: 6 additions & 5 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ type Handle struct {
asyncInit *misc.AsyncInit
eventSchemaV2Enabled bool
archivalEnabled bool
eventSchemaV2AllSources bool
eventAuditEnabled map[string]bool
}

adaptiveLimit func(int64) int64
Expand Down Expand Up @@ -590,7 +590,6 @@ func (proc *Handle) loadConfig() {
// EventSchemas feature. false by default
proc.config.enableEventSchemasFeature = config.GetBoolVar(false, "EventSchemas.enableEventSchemasFeature")
proc.config.eventSchemaV2Enabled = config.GetBoolVar(false, "EventSchemas2.enabled")
proc.config.eventSchemaV2AllSources = config.GetBoolVar(false, "EventSchemas2.enableAllSources")
proc.config.batchDestinations = misc.BatchDestinations()
proc.config.transformTimesPQLength = config.GetIntVar(5, 1, "Processor.transformTimesPQLength")
proc.config.transformerURL = config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090")
Expand Down Expand Up @@ -725,7 +724,9 @@ func (proc *Handle) backendConfigSubscriber(ctx context.Context) {
sourceIdDestinationMap = make(map[string][]backendconfig.DestinationT)
sourceIdSourceMap = map[string]backendconfig.SourceT{}
destinationIDtoTypeMap = make(map[string]string)
eventAuditEnabled = make(map[string]bool)
)
proc.config.configSubscriberLock.Lock()
atzoum marked this conversation as resolved.
Show resolved Hide resolved
for workspaceID, wConfig := range config {
for i := range wConfig.Sources {
source := &wConfig.Sources[i]
Expand All @@ -740,13 +741,14 @@ func (proc *Handle) backendConfigSubscriber(ctx context.Context) {
}
}
workspaceLibrariesMap[workspaceID] = wConfig.Libraries
eventAuditEnabled[workspaceID] = wConfig.Settings.EventAuditEnabled
}
proc.config.configSubscriberLock.Lock()
proc.config.destConsentCategories = destConsentCategories
proc.config.workspaceLibrariesMap = workspaceLibrariesMap
proc.config.sourceIdDestinationMap = sourceIdDestinationMap
proc.config.sourceIdSourceMap = sourceIdSourceMap
proc.config.destinationIDtoTypeMap = destinationIDtoTypeMap
proc.config.eventAuditEnabled = eventAuditEnabled
proc.config.configSubscriberLock.Unlock()
if !initDone {
initDone = true
Expand Down Expand Up @@ -1490,8 +1492,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
},
)
if proc.config.eventSchemaV2Enabled && // schemas enabled
// source has schemas enabled or if we override schemas for all sources
(source.EventSchemasEnabled || proc.config.eventSchemaV2AllSources) &&
atzoum marked this conversation as resolved.
Show resolved Hide resolved
proc.config.eventAuditEnabled[batchEvent.WorkspaceId] &&
atzoum marked this conversation as resolved.
Show resolved Hide resolved
kanishkkatara marked this conversation as resolved.
Show resolved Hide resolved
// TODO: could use source.SourceDefinition.Category instead?
commonMetadataFromSingularEvent.SourceJobRunID == "" {
if payload := payloadFunc(); payload != nil {
Expand Down
12 changes: 10 additions & 2 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,9 @@ var sampleBackendConfig = backendconfig.ConfigT{
},
},
},
Settings: backendconfig.Settings{
EventAuditEnabled: true,
},
}

func initProcessor() {
Expand Down Expand Up @@ -559,6 +562,7 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() {
EventCount: 1,
LastJobStatus: jobsdb.JobStatusT{},
Parameters: createBatchParameters(SourceIDEnabled),
WorkspaceId: sampleWorkspaceID,
},
{
UUID: uuid.New(),
Expand All @@ -578,6 +582,7 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() {
EventCount: 2,
LastJobStatus: jobsdb.JobStatusT{},
Parameters: createBatchParameters(SourceIDEnabledNoUT),
WorkspaceId: sampleWorkspaceID,
},
{
UUID: uuid.New(),
Expand All @@ -589,6 +594,7 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() {
EventCount: 1,
LastJobStatus: jobsdb.JobStatusT{},
Parameters: createBatchParameters(SourceIDEnabled),
WorkspaceId: sampleWorkspaceID,
},
{
UUID: uuid.New(),
Expand All @@ -600,6 +606,7 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() {
EventCount: 1,
LastJobStatus: jobsdb.JobStatusT{},
Parameters: createBatchParameters(SourceIDEnabled),
WorkspaceId: sampleWorkspaceID,
},
{
UUID: uuid.New(),
Expand All @@ -617,8 +624,9 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() {
},
createMessagePayload,
),
EventCount: 3,
Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT),
EventCount: 3,
Parameters: createBatchParametersWithSources(SourceIDEnabledNoUT),
WorkspaceId: sampleWorkspaceID,
},
}
mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl)
Expand Down
1 change: 1 addition & 0 deletions schema-forwarder/internal/testdata/configdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,5 +272,6 @@ var SampleBackendConfig = backendconfig.ConfigT{
DataRetention: backendconfig.DataRetention{
DisableReportingPII: true,
},
EventAuditEnabled: false,
},
}
Loading