From 63a918716e69ca6d07e94a50f5b7b1cd9b8ba85e Mon Sep 17 00:00:00 2001 From: Dilip Kola Date: Thu, 21 Nov 2024 13:09:18 +0530 Subject: [PATCH 1/2] fix: json marshal l errors when parsing poll response from klaviyo --- .../klaviyobulkupload/apiService.go | 7 +++++ .../klaviyobulkupload/klaviyobulkupload.go | 6 +++-- .../klaviyobulkupload_test.go | 26 +++++++++++++------ 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/apiService.go b/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/apiService.go index 34b45de4b2..fbb0ab6c0f 100644 --- a/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/apiService.go +++ b/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/apiService.go @@ -59,6 +59,10 @@ func (k *KlaviyoAPIServiceImpl) UploadProfiles(profiles Payload) (*UploadResp, e if len(uploadResp.Errors) > 0 { return &uploadResp, fmt.Errorf("upload failed with errors: %+v", uploadResp.Errors) } + if uploadResp.Data.Id == "" { + k.logger.Error("[klaviyo bulk upload] upload failed with empty importId", string(uploadBodyBytes)) + return &uploadResp, fmt.Errorf("upload failed with empty importId") + } uploadTimeStat := k.statsFactory.NewTaggedStat("async_upload_time", stats.TimerType, k.statLabels) uploadTimeStat.Since(startTime) @@ -66,6 +70,9 @@ func (k *KlaviyoAPIServiceImpl) UploadProfiles(profiles Payload) (*UploadResp, e } func (k *KlaviyoAPIServiceImpl) GetUploadStatus(importId string) (*PollResp, error) { + if importId == "" { + return nil, fmt.Errorf("importId is empty") + } pollUrl := KlaviyoAPIURL + importId req, err := http.NewRequest("GET", pollUrl, nil) if err != nil { diff --git a/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go b/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go index f3bf3df09a..65da191cdb 100644 --- a/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go +++ b/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go @@ -106,7 +106,9 @@ func (kbu *KlaviyoBulkUploader) Poll(pollInput common.AsyncPoll) common.PollStat importStatuses := make(map[string]string) failedImports := make([]string, 0) for _, importId := range importIds { - importStatuses[importId] = "queued" + if importId != "" { + importStatuses[importId] = "queued" + } } for { @@ -304,7 +306,7 @@ func (kbu *KlaviyoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS uploadResp, err := kbu.KlaviyoAPIService.UploadProfiles(combinedPayload) if err != nil { failedJobs = append(failedJobs, importingJobIDs[idx]) - kbu.Logger.Error("Error while uploading profiles", err, uploadResp.Errors) + kbu.Logger.Error("Error while uploading profiles", err, uploadResp.Errors, destinationID) continue } diff --git a/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_test.go b/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_test.go index ecc38caaea..541e808073 100644 --- a/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_test.go +++ b/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/tidwall/gjson" "go.uber.org/mock/gomock" "github.com/rudderlabs/rudder-go-kit/stats" @@ -31,7 +32,7 @@ var destination = &backendconfig.DestinationT{ Name: "KLAVIYO_BULK_UPLOAD", }, Config: map[string]interface{}{ - "privateApiKey": "1223", + "privateApiKey": "1234", }, Enabled: true, WorkspaceID: "1", @@ -192,13 +193,22 @@ func TestUploadIntegration(t *testing.T) { ImportingJobIDs: []int64{1, 2, 3}, } - output := kbu.Upload(asyncDestStruct) - assert.NotNil(t, output) - assert.Equal(t, destination.ID, output.DestinationID) - assert.Empty(t, output.FailedJobIDs) - assert.Empty(t, output.AbortJobIDs) - assert.Empty(t, output.AbortReason) - assert.NotEmpty(t, output.ImportingJobIDs) + uploadResp := kbu.Upload(asyncDestStruct) + assert.NotNil(t, uploadResp) + assert.Equal(t, destination.ID, uploadResp.DestinationID) + assert.Empty(t, uploadResp.FailedJobIDs) + assert.Empty(t, uploadResp.AbortJobIDs) + assert.Empty(t, uploadResp.AbortReason) + assert.NotEmpty(t, uploadResp.ImportingJobIDs) + assert.NotNil(t, uploadResp.ImportingParameters) + + importId := gjson.GetBytes(uploadResp.ImportingParameters, "importId").String() + pollResp := kbu.Poll(common.AsyncPoll{ImportId: importId}) + assert.NotNil(t, pollResp) + assert.Equal(t, http.StatusOK, pollResp.StatusCode) + assert.True(t, pollResp.Complete) + assert.False(t, pollResp.HasFailed) + assert.False(t, pollResp.HasWarning) } func TestPoll(t *testing.T) { From b9504e3e1da2bf8d2fc309139905e3025259ee8a Mon Sep 17 00:00:00 2001 From: Dilip Kola Date: Thu, 21 Nov 2024 17:54:59 +0530 Subject: [PATCH 2/2] fix: payload size limit issue --- .../asyncdestinationmanager/klaviyobulkupload/apiService.go | 2 +- .../klaviyobulkupload/klaviyobulkupload.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/apiService.go b/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/apiService.go index fbb0ab6c0f..ecab5b37e8 100644 --- a/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/apiService.go +++ b/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/apiService.go @@ -135,7 +135,7 @@ func NewKlaviyoAPIService(destination *backendconfig.DestinationT, logger logger statsFactory: statsFactory, statLabels: stats.Tags{ "module": "batch_router", - "destType": destination.Name, + "destType": destination.DestinationDefinition.Name, "destID": destination.ID, }, }, nil diff --git a/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go b/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go index 65da191cdb..91f1e5f201 100644 --- a/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go +++ b/router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go @@ -22,7 +22,7 @@ import ( const ( BATCHSIZE = 10000 MAXALLOWEDPROFILESIZE = 512000 - MAXPAYLOADSIZE = 4900000 + MAXPAYLOADSIZE = 4600000 IMPORT_ID_SEPARATOR = ":" ) @@ -286,8 +286,8 @@ func (kbu *KlaviyoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS // if profileStructure length is more than 500 kB, throw an error profileStructureJSON, _ := json.Marshal(profileStructure) profileSize := float64(len(profileStructureJSON)) - profileSizeStat.Observe(float64(profileSize)) // Record the size in the histogram - if float64(len(profileStructureJSON)) >= MAXALLOWEDPROFILESIZE { + profileSizeStat.Observe(profileSize) // Record the size in the histogram + if len(profileStructureJSON) >= MAXALLOWEDPROFILESIZE { abortReason = "Error while marshaling profiles. The profile size exceeds Klaviyo's limit of 500 kB for a single profile." abortedJobs = append(abortedJobs, int64(metadata.JobID)) continue