Skip to content

Commit

Permalink
feat: support reporting pii filter based on config backend (#2655)
Browse files Browse the repository at this point in the history
Co-authored-by: Pavan Chaithanya <pavan@rudderstack.com>
  • Loading branch information
chandumlg and BonapartePC authored Nov 7, 2022
1 parent 1c49e96 commit ef95eba
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 25 deletions.
7 changes: 4 additions & 3 deletions config/backend-config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ type Settings struct {
}

type DataRetention struct {
UseSelfStorage bool `json:"useSelfStorage"`
StorageBucket StorageBucket `json:"storageBucket"`
StoragePreferences StoragePreferences `json:"storagePreferences"`
DisableReportingPII bool `json:"disableReportingPii"`
UseSelfStorage bool `json:"useSelfStorage"`
StorageBucket StorageBucket `json:"storageBucket"`
StoragePreferences StoragePreferences `json:"storagePreferences"`
}

type StorageBucket struct {
Expand Down
1 change: 0 additions & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ BackendConfig:
Regulations:
pageSize: 50
pollInterval: 300s
useHostedBackendConfig: true
recovery:
enabled: true
errorStorePath: /tmp/error_store.json
Expand Down
4 changes: 4 additions & 0 deletions enterprise/reporting/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,7 @@ func (*NOOP) AddClient(_ context.Context, _ types.Config) {
func (*NOOP) GetClient(_ string) *types.Client {
return nil
}

func (*NOOP) IsPIIReportingDisabled(_ string) bool {
return false
}
64 changes: 44 additions & 20 deletions enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,20 @@ const (
)

type HandleT struct {
clients map[string]*types.Client
clientsMapLock sync.RWMutex
logger logger.Logger
reportingServiceURL string
namespace string
workspaceID string
instanceID string
workspaceIDForSourceIDMap map[string]string
workspaceIDForSourceIDMapLock sync.RWMutex
whActionsOnly bool
sleepInterval time.Duration
mainLoopSleepInterval time.Duration
init chan struct{}
onceInit sync.Once
clients map[string]*types.Client
clientsMapLock sync.RWMutex
logger logger.Logger
reportingServiceURL string
namespace string
workspaceID string
instanceID string
workspaceIDForSourceIDMap map[string]string
piiReportingSettings map[string]bool
whActionsOnly bool
sleepInterval time.Duration
mainLoopSleepInterval time.Duration

getMinReportedAtQueryTime stats.Measurement
getReportsQueryTime stats.Measurement
Expand All @@ -76,12 +78,14 @@ func NewFromEnvConfig() *HandleT {
}

return &HandleT{
init: make(chan struct{}),
logger: reportingLogger,
clients: make(map[string]*types.Client),
reportingServiceURL: reportingServiceURL,
namespace: config.GetKubeNamespace(),
instanceID: config.GetString("INSTANCE_ID", "1"),
workspaceIDForSourceIDMap: make(map[string]string),
piiReportingSettings: make(map[string]bool),
whActionsOnly: whActionsOnly,
sleepInterval: sleepInterval,
mainLoopSleepInterval: mainLoopSleepInterval,
Expand All @@ -95,28 +99,35 @@ func (handle *HandleT) setup(beConfigHandle backendconfig.BackendConfig) {

for beconfig := range ch {
config := beconfig.Data.(map[string]backendconfig.ConfigT)
handle.workspaceIDForSourceIDMapLock.Lock()
newWorkspaceIDForSourceIDMap := make(map[string]string)
newPIIReportingSettings := make(map[string]bool)
var newWorkspaceID string

for workspaceID, wConfig := range config {
newWorkspaceID = workspaceID
for _, source := range wConfig.Sources {
newWorkspaceIDForSourceIDMap[source.ID] = workspaceID
}
newPIIReportingSettings[workspaceID] = wConfig.Settings.DataRetention.DisableReportingPII
}
if len(config) > 1 {
newWorkspaceID = ""
}
handle.workspaceID = newWorkspaceID
handle.workspaceIDForSourceIDMap = newWorkspaceIDForSourceIDMap
handle.workspaceIDForSourceIDMapLock.Unlock()
handle.piiReportingSettings = newPIIReportingSettings
handle.onceInit.Do(func() {
close(handle.init)
})
}

handle.onceInit.Do(func() {
close(handle.init)
})
}

func (handle *HandleT) getWorkspaceID(sourceID string) string {
handle.workspaceIDForSourceIDMapLock.RLock()
defer handle.workspaceIDForSourceIDMapLock.RUnlock()
<-handle.init
return handle.workspaceIDForSourceIDMap[sourceID]
}

Expand Down Expand Up @@ -435,7 +446,7 @@ func isMetricPosted(status int) bool {
}

func getPIIColumnsToExclude() []string {
piiColumnsToExclude := strings.Split(config.GetString("REPORTING_PII_COLUMNS_TO_EXCLUDE", ""), ",")
piiColumnsToExclude := strings.Split(config.GetString("REPORTING_PII_COLUMNS_TO_EXCLUDE", "sample_event,sample_response"), ",")
for i := range piiColumnsToExclude {
piiColumnsToExclude[i] = strings.Trim(piiColumnsToExclude[i], " ")
}
Expand All @@ -444,16 +455,26 @@ func getPIIColumnsToExclude() []string {

func transformMetricForPII(metric types.PUReportedMetric, piiColumns []string) types.PUReportedMetric {
for _, col := range piiColumns {
if col == "sample_event" {
switch col {
case "sample_event":
metric.StatusDetail.SampleEvent = []byte(`{}`)
} else if col == "sample_response" {
case "sample_response":
metric.StatusDetail.SampleResponse = ""
case "event_name":
metric.StatusDetail.EventName = ""
case "event_type":
metric.StatusDetail.EventType = ""
}
}

return metric
}

func (handle *HandleT) IsPIIReportingDisabled(workspaceID string) bool {
<-handle.init
return handle.piiReportingSettings[workspaceID]
}

func (handle *HandleT) Report(metrics []*types.PUReportedMetric, txn *sql.Tx) {
if len(metrics) == 0 {
return
Expand All @@ -469,7 +490,10 @@ func (handle *HandleT) Report(metrics []*types.PUReportedMetric, txn *sql.Tx) {
reportedAt := time.Now().UTC().Unix() / 60
for _, metric := range metrics {
workspaceID := handle.getWorkspaceID(metric.ConnectionDetails.SourceID)
metric := transformMetricForPII(*metric, getPIIColumnsToExclude())
metric := *metric
if handle.IsPIIReportingDisabled(workspaceID) {
metric = transformMetricForPII(metric, getPIIColumnsToExclude())
}

_, err = stmt.Exec(workspaceID, handle.namespace, handle.instanceID, metric.ConnectionDetails.SourceDefinitionId, metric.ConnectionDetails.SourceCategory, metric.ConnectionDetails.SourceID, metric.ConnectionDetails.DestinationDefinitionId, metric.ConnectionDetails.DestinationID, metric.ConnectionDetails.SourceBatchID, metric.ConnectionDetails.SourceTaskID, metric.ConnectionDetails.SourceTaskRunID, metric.ConnectionDetails.SourceJobID, metric.ConnectionDetails.SourceJobRunID, metric.PUDetails.InPU, metric.PUDetails.PU, reportedAt, metric.StatusDetail.Status, metric.StatusDetail.Count, metric.PUDetails.TerminalPU, metric.PUDetails.InitialPU, metric.StatusDetail.StatusCode, metric.StatusDetail.SampleResponse, string(metric.StatusDetail.SampleEvent), metric.StatusDetail.EventName, metric.StatusDetail.EventType)
if err != nil {
Expand Down
78 changes: 77 additions & 1 deletion enterprise/reporting/reporting_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
package reporting

import (
"context"
"sync"
"testing"

"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
mock_backendconfig "github.com/rudderlabs/rudder-server/mocks/config/backend-config"
"github.com/rudderlabs/rudder-server/utils/pubsub"
"github.com/rudderlabs/rudder-server/utils/types"
)

Expand Down Expand Up @@ -31,6 +39,8 @@ var _ = Describe("Reporting", func() {
StatusCode: 0,
SampleResponse: `{"some-sample-response-key": "some-sample-response-value"}`,
SampleEvent: []byte(`{"some-sample-event-key": "some-sample-event-value"}`),
EventName: "some-event-name",
EventType: "some-event-type",
},
}

Expand All @@ -56,10 +66,12 @@ var _ = Describe("Reporting", func() {
StatusCode: 0,
SampleResponse: "",
SampleEvent: []byte(`{}`),
EventName: "",
EventType: "",
},
}

piiColumnsToExclude := []string{"sample_response", "sample_event"}
piiColumnsToExclude := []string{"sample_response", "sample_event", "event_name", "event_type"}
transformedMetric := transformMetricForPII(inputMetric, piiColumnsToExclude)
assertReportMetric(expectedResponse, transformedMetric)
})
Expand All @@ -83,4 +95,68 @@ func assertReportMetric(expectedMetric, actualMetric types.PUReportedMetric) {
Expect(expectedMetric.StatusDetail.Count).To(Equal(actualMetric.StatusDetail.Count))
Expect(expectedMetric.StatusDetail.SampleResponse).To(Equal(actualMetric.StatusDetail.SampleResponse))
Expect(expectedMetric.StatusDetail.SampleEvent).To(Equal(actualMetric.StatusDetail.SampleEvent))
Expect(expectedMetric.StatusDetail.EventName).To(Equal(actualMetric.StatusDetail.EventName))
Expect(expectedMetric.StatusDetail.EventType).To(Equal(actualMetric.StatusDetail.EventType))
}

func TestReportingBasedOnConfigBackend(t *testing.T) {
RegisterTestingT(t)
ctrl := gomock.NewController(t)
config := mock_backendconfig.NewMockBackendConfig(ctrl)

configCh := make(chan pubsub.DataEvent)

var ready sync.WaitGroup
ready.Add(2)

var reportingSettings sync.WaitGroup
reportingSettings.Add(1)

config.EXPECT().Subscribe(
gomock.Any(),
gomock.Eq(backendconfig.TopicBackendConfig),
).DoAndReturn(func(ctx context.Context, topic backendconfig.Topic) pubsub.DataChannel {
ready.Done()
go func() {
<-ctx.Done()
close(configCh)
}()

return configCh
})

f := &Factory{
EnterpriseToken: "dummy-token",
}
f.Setup(config)
reporting := f.GetReportingInstance()

var reportingDisabled bool

go func() {
ready.Done()
reportingDisabled = reporting.IsPIIReportingDisabled("testWorkspaceId-1")
reportingSettings.Done()
}()

// When the config backend has not published any event yet
ready.Wait()
Expect(reportingDisabled).To(BeFalse())

configCh <- pubsub.DataEvent{
Data: map[string]backendconfig.ConfigT{
"testWorkspaceId-1": {
WorkspaceID: "testWorkspaceId-1",
Settings: backendconfig.Settings{
DataRetention: backendconfig.DataRetention{
DisableReportingPII: true,
},
},
},
},
Topic: string(backendconfig.TopicBackendConfig),
}

reportingSettings.Wait()
Expect(reportingDisabled).To(BeTrue())
}
14 changes: 14 additions & 0 deletions mocks/utils/types/mock_types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions utils/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type ReportingI interface {
WaitForSetup(ctx context.Context, clientName string) error
AddClient(ctx context.Context, c Config)
Report(metrics []*PUReportedMetric, txn *sql.Tx)
IsPIIReportingDisabled(string) bool
}

// ConfigT simple map config structure
Expand Down

0 comments on commit ef95eba

Please sign in to comment.