Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: json marshal l errors when parsing poll response from klaviyo #5316

Open
wants to merge 2 commits into
base: release/1.38.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,20 @@
if len(uploadResp.Errors) > 0 {
return &uploadResp, fmt.Errorf("upload failed with errors: %+v", uploadResp.Errors)
}
if uploadResp.Data.Id == "" {
k.logger.Error("[klaviyo bulk upload] upload failed with empty importId", string(uploadBodyBytes))
return &uploadResp, fmt.Errorf("upload failed with empty importId")
}

Check warning on line 65 in router/batchrouter/asyncdestinationmanager/klaviyobulkupload/apiService.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/klaviyobulkupload/apiService.go#L62-L65

Added lines #L62 - L65 were not covered by tests
uploadTimeStat := k.statsFactory.NewTaggedStat("async_upload_time", stats.TimerType, k.statLabels)
uploadTimeStat.Since(startTime)

return &uploadResp, uploadRespErr
}

func (k *KlaviyoAPIServiceImpl) GetUploadStatus(importId string) (*PollResp, error) {
if importId == "" {
return nil, fmt.Errorf("importId is empty")
}

Check warning on line 75 in router/batchrouter/asyncdestinationmanager/klaviyobulkupload/apiService.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/klaviyobulkupload/apiService.go#L73-L75

Added lines #L73 - L75 were not covered by tests
pollUrl := KlaviyoAPIURL + importId
req, err := http.NewRequest("GET", pollUrl, nil)
if err != nil {
Expand Down Expand Up @@ -128,7 +135,7 @@
statsFactory: statsFactory,
statLabels: stats.Tags{
"module": "batch_router",
"destType": destination.Name,
"destType": destination.DestinationDefinition.Name,
"destID": destination.ID,
},
}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
const (
BATCHSIZE = 10000
MAXALLOWEDPROFILESIZE = 512000
MAXPAYLOADSIZE = 4900000
MAXPAYLOADSIZE = 4600000
IMPORT_ID_SEPARATOR = ":"
)

Expand Down Expand Up @@ -106,7 +106,9 @@ func (kbu *KlaviyoBulkUploader) Poll(pollInput common.AsyncPoll) common.PollStat
importStatuses := make(map[string]string)
failedImports := make([]string, 0)
for _, importId := range importIds {
importStatuses[importId] = "queued"
if importId != "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean all the events that were part of unsuccessful upload gets re-tried in next attempt as they don't get marked as queued ?

importStatuses[importId] = "queued"
}
}

for {
Expand Down Expand Up @@ -284,8 +286,8 @@ func (kbu *KlaviyoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS
// if profileStructure length is more than 500 kB, throw an error
profileStructureJSON, _ := json.Marshal(profileStructure)
profileSize := float64(len(profileStructureJSON))
profileSizeStat.Observe(float64(profileSize)) // Record the size in the histogram
if float64(len(profileStructureJSON)) >= MAXALLOWEDPROFILESIZE {
profileSizeStat.Observe(profileSize) // Record the size in the histogram
if len(profileStructureJSON) >= MAXALLOWEDPROFILESIZE {
abortReason = "Error while marshaling profiles. The profile size exceeds Klaviyo's limit of 500 kB for a single profile."
abortedJobs = append(abortedJobs, int64(metadata.JobID))
continue
Expand All @@ -304,7 +306,7 @@ func (kbu *KlaviyoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS
uploadResp, err := kbu.KlaviyoAPIService.UploadProfiles(combinedPayload)
if err != nil {
failedJobs = append(failedJobs, importingJobIDs[idx])
kbu.Logger.Error("Error while uploading profiles", err, uploadResp.Errors)
kbu.Logger.Error("Error while uploading profiles", err, uploadResp.Errors, destinationID)
continue
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/tidwall/gjson"
"go.uber.org/mock/gomock"

"github.com/rudderlabs/rudder-go-kit/stats"
Expand All @@ -31,7 +32,7 @@ var destination = &backendconfig.DestinationT{
Name: "KLAVIYO_BULK_UPLOAD",
},
Config: map[string]interface{}{
"privateApiKey": "1223",
"privateApiKey": "1234",
},
Enabled: true,
WorkspaceID: "1",
Expand Down Expand Up @@ -192,13 +193,22 @@ func TestUploadIntegration(t *testing.T) {
ImportingJobIDs: []int64{1, 2, 3},
}

output := kbu.Upload(asyncDestStruct)
assert.NotNil(t, output)
assert.Equal(t, destination.ID, output.DestinationID)
assert.Empty(t, output.FailedJobIDs)
assert.Empty(t, output.AbortJobIDs)
assert.Empty(t, output.AbortReason)
assert.NotEmpty(t, output.ImportingJobIDs)
uploadResp := kbu.Upload(asyncDestStruct)
assert.NotNil(t, uploadResp)
assert.Equal(t, destination.ID, uploadResp.DestinationID)
assert.Empty(t, uploadResp.FailedJobIDs)
assert.Empty(t, uploadResp.AbortJobIDs)
assert.Empty(t, uploadResp.AbortReason)
assert.NotEmpty(t, uploadResp.ImportingJobIDs)
assert.NotNil(t, uploadResp.ImportingParameters)

importId := gjson.GetBytes(uploadResp.ImportingParameters, "importId").String()
pollResp := kbu.Poll(common.AsyncPoll{ImportId: importId})
assert.NotNil(t, pollResp)
assert.Equal(t, http.StatusOK, pollResp.StatusCode)
assert.True(t, pollResp.Complete)
assert.False(t, pollResp.HasFailed)
assert.False(t, pollResp.HasWarning)
}

func TestPoll(t *testing.T) {
Expand Down
Loading