Skip to content

Commit

Permalink
chore: adding failed seed data files
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Oct 21, 2024
1 parent a1fd52e commit 14aa043
Show file tree
Hide file tree
Showing 13 changed files with 77 additions and 52 deletions.
22 changes: 3 additions & 19 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2973,10 +2973,9 @@ func (proc *Handle) responsesDiffer(
}

var (
differedSampleEvents []types.SingularEventT
differedEventsCount int
collectedSampleEvent bool
collectedSampleFailedEvent bool
differedSampleEvents []types.SingularEventT
differedEventsCount int
collectedSampleEvent bool
)

for i := range pResponse.Events {
Expand All @@ -2991,21 +2990,6 @@ func (proc *Handle) responsesDiffer(
}
}
}
for i := range pResponse.FailedEvents {
wResponse.FailedEvents[i].Error = pResponse.FailedEvents[i].Error // Ensure errors match

if !reflect.DeepEqual(pResponse.FailedEvents[i], wResponse.FailedEvents[i]) {
differedEventsCount++
if !collectedSampleFailedEvent {
// Collect the mismatched messages and break (sample only)
differedSampleEvents = append(differedSampleEvents, lo.Map(pResponse.FailedEvents[i].Metadata.GetMessagesIDs(), func(msgID string, _ int) types.SingularEventT {
return eventsByMessageID[msgID].SingularEvent
})...)
collectedSampleFailedEvent = true
}
}
}

proc.statsFactory.NewStat("proc_warehouse_transformations_mismatches", stats.CountType).Count(differedEventsCount)

return differedSampleEvents
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
go test fuzz v1
string("{\"tYpe\":\"00000\"}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
go test fuzz v1
string("{\"type\":\"pAge\",\"messageId\":\"0\",\"receivedAt\":\"0000-01-01T0:00:00Z\"}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
go test fuzz v1
string("{\"type\":\"pAge\"}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
go test fuzz v1
string("\"\"")
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
go test fuzz v1
string("{\"type\":{}}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
go test fuzz v1
string("{\"tYpe\":\"trACk\"}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
go test fuzz v1
string("{\"type\":\"AliAs\",\"messageId\":\"0001-01-01\",\"receivedAt\":\"0000-01-01T0:00:00Z\"}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
go test fuzz v1
string("{}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
go test fuzz v1
string("{\"tYpe\":\"track\"}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
go test fuzz v1
string("{\"type\":\"pAge\",\"messageId\":{}}")
46 changes: 29 additions & 17 deletions warehouse/transformer/testhelper/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,33 @@ func validateResponseLengths(t *testing.T, expectedResponse, pResponse, wRespons

func validateRudderEventIfExists(t *testing.T, expectedResponse, pResponse, wResponse ptrans.Response) {
for i := range pResponse.Events {
data := expectedResponse.Events[i].Output["data"]
if data != nil && data.(map[string]any)["rudder_event"] != nil {
expectedRudderEvent := expectedResponse.Events[i].Output["data"].(map[string]any)["rudder_event"].(string)
require.JSONEq(t, expectedRudderEvent, pResponse.Events[i].Output["data"].(map[string]any)["rudder_event"].(string))
require.JSONEq(t, expectedRudderEvent, wResponse.Events[i].Output["data"].(map[string]any)["rudder_event"].(string))
require.JSONEq(t, wResponse.Events[i].Output["data"].(map[string]any)["rudder_event"].(string), pResponse.Events[i].Output["data"].(map[string]any)["rudder_event"].(string))

// Clean up rudder_event key after comparison
delete(pResponse.Events[i].Output["data"].(map[string]any), "rudder_event")
delete(wResponse.Events[i].Output["data"].(map[string]any), "rudder_event")
delete(expectedResponse.Events[i].Output["data"].(map[string]any), "rudder_event")
data, ok := expectedResponse.Events[i].Output["data"].(map[string]interface{})
if !ok {
continue // No data to validate
}

rudderEvent, ok := data["rudder_event"].(string)
if !ok {
continue // No rudder_event key, skip validation
}

pEventData, ok := pResponse.Events[i].Output["data"].(map[string]interface{})
require.True(t, ok, "pResponse data must be a map")
pRudderEvent, ok := pEventData["rudder_event"].(string)
require.True(t, ok, "pResponse rudder_event must be a string")
require.JSONEq(t, rudderEvent, pRudderEvent)

wEventData, ok := wResponse.Events[i].Output["data"].(map[string]interface{})
require.True(t, ok, "wResponse data must be a map")
wRudderEvent, ok := wEventData["rudder_event"].(string)
require.True(t, ok, "wResponse rudder_event must be a string")
require.JSONEq(t, rudderEvent, wRudderEvent)

require.JSONEq(t, pRudderEvent, wRudderEvent)

delete(pEventData, "rudder_event")
delete(wEventData, "rudder_event")
delete(data, "rudder_event")
}
}

Expand All @@ -89,11 +104,8 @@ func validateFailedEventEquality(t *testing.T, expectedResponse, pResponse, wRes
require.NotEmpty(t, wResponse.FailedEvents[i].Error)
require.NotEmpty(t, expectedResponse.FailedEvents[i].Error)

expectedResponse.FailedEvents[i].Error = pResponse.FailedEvents[i].Error
wResponse.FailedEvents[i].Error = pResponse.FailedEvents[i].Error

require.EqualValues(t, expectedResponse.FailedEvents[i], pResponse.FailedEvents[i])
require.EqualValues(t, expectedResponse.FailedEvents[i], wResponse.FailedEvents[i])
require.EqualValues(t, wResponse.FailedEvents[i], pResponse.FailedEvents[i])
require.NotZero(t, pResponse.FailedEvents[i].StatusCode)
require.NotZero(t, wResponse.FailedEvents[i].StatusCode)
require.NotZero(t, expectedResponse.FailedEvents[i].StatusCode)
}
}
41 changes: 25 additions & 16 deletions warehouse/transformer/transformer_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
Expand Down Expand Up @@ -451,16 +452,27 @@ func FuzzTransformer(f *testing.F) {
}

f.Fuzz(func(t *testing.T, payload string) {
payload, err := sanitizeJSON(payload)
sanitizedPayload, err := sanitizePayload(payload)
if err != nil {
return
}

eventType := gjson.Get(payload, "type").String()
eventName := gjson.Get(payload, "event").String()
messageID := gjson.Get(payload, "messageId").String()
receivedAt := gjson.Get(payload, "receivedAt").Time()
recordID := gjson.Get(payload, "recordId").Value()
var (
eventType = gjson.Get(sanitizedPayload, "type").String()
eventName = gjson.Get(sanitizedPayload, "event").String()
messageID = gjson.Get(sanitizedPayload, "messageId").String()
receivedAt = gjson.Get(sanitizedPayload, "receivedAt").Time()
recordID = gjson.Get(sanitizedPayload, "recordId").Value()
)

if len(messageID) == 0 || receivedAt.IsZero() {
return
}

sanitizedPayload, err = sjson.Set(sanitizedPayload, "receivedAt", receivedAt.Format(misc.RFC3339Milli))
if err != nil {
return
}

c := setupConfig(transformerResource, map[string]any{})

Expand All @@ -469,13 +481,13 @@ func FuzzTransformer(f *testing.F) {

for _, destType := range whutils.WarehouseDestinations {
destConfig := map[string]any{}
for k, v := range destConfigOpts[len(payload)%len(destConfigOpts)] {
for k, v := range destConfigOpts[len(sanitizedPayload)%len(destConfigOpts)] {
destConfig[k] = v
}

eventsInfos := []testhelper.EventInfo{
{
Payload: []byte(payload),
Payload: []byte(sanitizedPayload),
Metadata: ptrans.Metadata{
EventType: eventType,
EventName: eventName,
Expand Down Expand Up @@ -534,27 +546,24 @@ func cmpEvents(t *testing.T, infos []testhelper.EventInfo, pTransformer, dTransf
require.NotEmpty(t, pResponse.FailedEvents[i].Error)
require.NotEmpty(t, wResponse.FailedEvents[i].Error)

wResponse.FailedEvents[i].Error = pResponse.FailedEvents[i].Error

require.EqualValues(t, wResponse.FailedEvents[i], pResponse.FailedEvents[i])
require.NotZero(t, pResponse.FailedEvents[i].StatusCode)
require.NotZero(t, wResponse.FailedEvents[i].StatusCode)
}
}

func sanitizeJSON(input string) (string, error) {
func sanitizePayload(input string) (string, error) {
sanitized := strings.ReplaceAll(input, `\u0000`, "")

if len(strings.TrimSpace(sanitized)) == 0 {
return "{}", nil
}

var result any
var result types.SingularEventT
if err := json.Unmarshal([]byte(sanitized), &result); err != nil {
return "", errors.New("invalid JSON format")
}

output, err := json.Marshal(result)
if err != nil {
return "", err
return "", fmt.Errorf("marshalling error: %w", err)
}
return string(output), nil
}

0 comments on commit 14aa043

Please sign in to comment.