Skip to content

Commit

Permalink
feat: onboarding lytics bulk upload (#5000)
Browse files Browse the repository at this point in the history
* feat: initial commit

* feat: lytics bulk upload initial commit

* fix: removing compilation errors

* fix: fixing flow after dev test

* fix: edits after e2e and adding test cases

* fix: self assignment removed

* fix: adding more test cases

* fix: fixing lint error

* fix: fixing lint error

* fix: removing unnecessary code

* fix: using jsoniter

* fix: using SimpleAsyncDestinationManager

* fix: empty file remove

* fix: code refactoring

* fix: fixing test cases

* fix: fixing lint issues

* fix: test case fix

* fix: test case fix 2

* fix: varible names change
  • Loading branch information
shrouti1507 authored Aug 30, 2024
1 parent d68ceb3 commit 5c38dab
Show file tree
Hide file tree
Showing 11 changed files with 811 additions and 2 deletions.
70 changes: 70 additions & 0 deletions mocks/router/lytics_bulk_upload/lytics_bulk_upload_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion router/batchrouter/asyncdestinationmanager/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package common
import "slices"

var (
asyncDestinations = []string{"MARKETO_BULK_UPLOAD", "BINGADS_AUDIENCE", "ELOQUA", "YANDEX_METRICA_OFFLINE_EVENTS", "BINGADS_OFFLINE_CONVERSIONS", "KLAVIYO_BULK_UPLOAD"}
asyncDestinations = []string{"MARKETO_BULK_UPLOAD", "BINGADS_AUDIENCE", "ELOQUA", "YANDEX_METRICA_OFFLINE_EVENTS", "BINGADS_OFFLINE_CONVERSIONS", "KLAVIYO_BULK_UPLOAD", "LYTICS_BULK_UPLOAD"}
sftpDestinations = []string{"SFTP"}
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package lyticsBulkUpload

import (
"fmt"
"io"
"net/http"
"os"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/tidwall/gjson"

"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
)

type LyticsServiceImpl struct {
BulkApi string
}

func (u *LyticsServiceImpl) getBulkApi(destConfig DestinationConfig) *LyticsServiceImpl {
return &LyticsServiceImpl{
BulkApi: fmt.Sprintf("https://bulk.lytics.io/collect/bulk/%s?timestamp_field=%s", destConfig.LyticsStreamName, destConfig.TimestampField),
}
}

func (*LyticsBulkUploader) Transform(job *jobsdb.JobT) (string, error) {
return common.GetMarshalledData(string(job.EventPayload), job.JobID)
}

func (u *LyticsServiceImpl) MakeHTTPRequest(data *HttpRequestData) ([]byte, int, error) {
req, err := http.NewRequest(data.Method, data.Endpoint, data.Body)
if err != nil {
return nil, 500, err
}
req.Header.Add("Authorization", data.Authorization)
req.Header.Add("content-type", data.ContentType)
client := &http.Client{}
res, err := client.Do(req)
if err != nil {
return nil, 500, err
}
defer res.Body.Close()

body, err := io.ReadAll(res.Body)
if err != nil {
return nil, 500, err
}
return body, res.StatusCode, err
}

func (u *LyticsServiceImpl) UploadBulkFile(data *HttpRequestData, filePath string) error {
file, err := os.Open(filePath)
if err != nil {
return err
}

data.Method = http.MethodPost
data.ContentType = "application/csv"
data.Body = file
_, statusCode, err := u.MakeHTTPRequest(data)
if err != nil {
return err
}
if statusCode != 200 {
return fmt.Errorf("Upload failed with status code: %d", statusCode)
}
return nil
}

func (u *LyticsBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput {
destination := asyncDestStruct.Destination
filePath := asyncDestStruct.FileName
destConfig, err := jsoniter.Marshal(destination.Config)
if err != nil {
eventsAbortedStat := stats.Default.NewTaggedStat("failed_job_count", stats.CountType, map[string]string{
"module": "batch_router",
"destType": u.destName,
})
eventsAbortedStat.Count(len(asyncDestStruct.ImportingJobIDs))
return common.AsyncUploadOutput{
AbortCount: len(asyncDestStruct.ImportingJobIDs),
DestinationID: asyncDestStruct.Destination.ID,
AbortJobIDs: asyncDestStruct.ImportingJobIDs,
AbortReason: fmt.Sprintf("%s %v", "Error while marshalling destination config", err.Error()),
}
}
var failedJobs []int64
var successJobs []int64

destConfigJson := string(destConfig)
streamTraitsMapping := convertGjsonToStreamTraitMapping(gjson.Get(destConfigJson, "streamTraitsMapping"))
if err != nil {
return common.AsyncUploadOutput{
FailedJobIDs: append(asyncDestStruct.FailedJobIDs, asyncDestStruct.ImportingJobIDs...),
FailedReason: fmt.Sprintf("failed to convert streamTraitsMapping: %v", err.Error()),
FailedCount: len(asyncDestStruct.FailedJobIDs) + len(asyncDestStruct.ImportingJobIDs),
DestinationID: destination.ID,
}
}
actionFiles, err := u.createCSVFile(filePath, streamTraitsMapping)
if err != nil {
return common.AsyncUploadOutput{
FailedJobIDs: append(asyncDestStruct.FailedJobIDs, asyncDestStruct.ImportingJobIDs...),
FailedReason: fmt.Sprintf("got error while transforming the file. %v", err.Error()),
FailedCount: len(asyncDestStruct.FailedJobIDs) + len(asyncDestStruct.ImportingJobIDs),
DestinationID: destination.ID,
}
}
uploadRetryableStat := stats.Default.NewTaggedStat("events_over_prescribed_limit", stats.CountType, map[string]string{
"module": "batch_router",
"destType": u.destName,
})

uploadRetryableStat.Count(len(actionFiles.FailedJobIDs))

uploadTimeStat := stats.Default.NewTaggedStat("async_upload_time", stats.TimerType, map[string]string{
"module": "batch_router",
"destType": u.destName,
})

uploadData := HttpRequestData{
Endpoint: u.baseEndpoint,
Authorization: u.authorization,
}

startTime := time.Now()
errorDuringUpload := u.service.UploadBulkFile(&uploadData, actionFiles.CSVFilePath)
uploadTimeStat.Since(startTime)

if errorDuringUpload != nil {
u.logger.Error("error in uploading the bulk file: %v", errorDuringUpload)
failedJobs = append(append(failedJobs, actionFiles.SuccessfulJobIDs...), actionFiles.FailedJobIDs...)
// remove the file that could not be uploaded
err = os.Remove(actionFiles.CSVFilePath)
if err != nil {
return common.AsyncUploadOutput{
FailedJobIDs: append(asyncDestStruct.FailedJobIDs, asyncDestStruct.ImportingJobIDs...),
FailedReason: fmt.Sprintf("Error in removing zip file: %v", err.Error()),
FailedCount: len(asyncDestStruct.FailedJobIDs) + len(asyncDestStruct.ImportingJobIDs),
DestinationID: destination.ID,
}
} else {
return common.AsyncUploadOutput{
FailedJobIDs: failedJobs,
FailedReason: fmt.Sprintf("error in uploading the bulk file: %v", errorDuringUpload.Error()),
FailedCount: len(asyncDestStruct.FailedJobIDs) + len(asyncDestStruct.ImportingJobIDs),
DestinationID: destination.ID,
}
}
}

failedJobs = append(failedJobs, actionFiles.FailedJobIDs...)
successJobs = append(successJobs, actionFiles.SuccessfulJobIDs...)

err = os.Remove(actionFiles.CSVFilePath)
if err != nil {
u.logger.Error("Error in removing zip file: %v", err)
}

return common.AsyncUploadOutput{
ImportingJobIDs: successJobs,
FailedJobIDs: append(asyncDestStruct.FailedJobIDs, failedJobs...),
ImportingCount: len(successJobs),
FailedCount: len(asyncDestStruct.FailedJobIDs) + len(failedJobs),
DestinationID: destination.ID,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package lyticsBulkUpload_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestLyticsbulkupload(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "lytics_bulk_upload Suite")
}
Loading

0 comments on commit 5c38dab

Please sign in to comment.