Skip to content

Commit

Permalink
chore: applying 1.15.3 hotfixes to main branch (#4000)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum authored Oct 20, 2023
1 parent 10c0ffe commit e74cd7d
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 29 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## [1.15.3](https://github.com/rudderlabs/rudder-server/compare/v1.15.2...v1.15.3) (2023-10-19)


### Bug Fixes

* corrupted rsources stats captured by processor for dropped jobs ([#3999](https://github.com/rudderlabs/rudder-server/issues/3999)) ([e7b829d](https://github.com/rudderlabs/rudder-server/commit/e7b829d0565dcba3b902208c30727c7a23a6c2e8))
* update error parsing of eloqua ([#3996](https://github.com/rudderlabs/rudder-server/issues/3996)) ([978c292](https://github.com/rudderlabs/rudder-server/commit/978c292605b2cafa8b74408ff8cd1959b3b59503))

## [1.15.2](https://github.com/rudderlabs/rudder-server/compare/v1.15.1...v1.15.2) (2023-10-18)


Expand Down
9 changes: 6 additions & 3 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2573,10 +2573,13 @@ func (proc *Handle) transformSrcDest(
}
}

func (proc *Handle) saveDroppedJobs(failedJobs []*jobsdb.JobT, tx *jobsdb.Tx) error {
if len(failedJobs) > 0 {
func (proc *Handle) saveDroppedJobs(droppedJobs []*jobsdb.JobT, tx *jobsdb.Tx) error {
if len(droppedJobs) > 0 {
for i := range droppedJobs { // each dropped job should have a unique jobID in the scope of the batch
droppedJobs[i].JobID = int64(i)
}
rsourcesStats := rsources.NewDroppedJobsCollector(proc.rsourcesService)
rsourcesStats.JobsDropped(failedJobs)
rsourcesStats.JobsDropped(droppedJobs)
return rsourcesStats.Publish(context.TODO(), tx.Tx)
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions router/batchrouter/asyncdestinationmanager/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,12 @@ func GetBatchRouterConfigInt64(key, destType string, defaultValue int64) int64 {
return config.GetInt64("BatchRouter."+key, defaultValue)
}

func GetBatchRouterConfigBool(key, destType string, defaultValue bool) bool {
func GetBatchRouterConfigStringMap(key, destType string, defaultValue []string) []string {
destOverrideFound := config.IsSet("BatchRouter." + destType + "." + key)
if destOverrideFound {
return config.GetBool("BatchRouter."+destType+"."+key, defaultValue)
return config.GetStringSlice("BatchRouter."+destType+"."+key, defaultValue)
}
return config.GetBool("BatchRouter."+key, defaultValue)
return config.GetStringSlice("BatchRouter."+key, defaultValue)
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func (b *EloquaBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStru
return b.createAsyncUploadErrorOutput("got error while fetching fields. ", err, destination.ID, asyncDestStruct)
}

uniqueKeys := getUniqueKeys(eloquaFields)
b.uniqueKeys = uniqueKeys
uploadJobInfo := JobInfo{
fileSizeLimit: b.fileSizeLimit,
importingJobs: asyncDestStruct.ImportingJobIDs,
Expand Down Expand Up @@ -227,7 +229,7 @@ func (b *EloquaBulkUploader) GetUploadStats(UploadStatsInput common.GetUploadSta
DynamicPart: UploadStatsInput.WarningJobURLs,
Authorization: b.authorization,
}
eventStatMetaWithRejectedSucceededJobs, err := parseRejectedData(&checkRejectedData, UploadStatsInput.ImportingList, b.service, b.jobToCSVMap)
eventStatMetaWithRejectedSucceededJobs, err := parseRejectedData(&checkRejectedData, UploadStatsInput.ImportingList, b)
if err != nil {
b.logger.Error("Error while parsing rejected data", err)
return common.GetUploadStatsResponse{
Expand Down
17 changes: 9 additions & 8 deletions router/batchrouter/asyncdestinationmanager/eloqua/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ func NewManager(destination *backendconfig.DestinationT) (*EloquaBulkUploader, e

func NewEloquaBulkUploader(destinationName, authorization, baseEndpoint string, eloqua EloquaService) *EloquaBulkUploader {
return &EloquaBulkUploader{
destName: destinationName,
logger: logger.NewLogger().Child("batchRouter").Child("AsyncDestinationManager").Child("Eloqua").Child("EloquaBulkUploader"),
authorization: authorization,
baseEndpoint: baseEndpoint,
fileSizeLimit: common.GetBatchRouterConfigInt64("MaxUploadLimit", destinationName, 32*bytesize.MB),
eventsLimit: common.GetBatchRouterConfigInt64("MaxEventsLimit", destinationName, 1000000),
service: eloqua,
jobToCSVMap: map[int64]int64{},
destName: destinationName,
logger: logger.NewLogger().Child("batchRouter").Child("AsyncDestinationManager").Child("Eloqua").Child("EloquaBulkUploader"),
authorization: authorization,
baseEndpoint: baseEndpoint,
fileSizeLimit: common.GetBatchRouterConfigInt64("MaxUploadLimit", destinationName, 32*bytesize.MB),
eventsLimit: common.GetBatchRouterConfigInt64("MaxEventsLimit", destinationName, 1000000),
successStatusCode: common.GetBatchRouterConfigStringMap("SuccessStatusCode", destinationName, []string{"ELQ-00040"}),
service: eloqua,
jobToCSVMap: map[int64]int64{},
}
}
18 changes: 10 additions & 8 deletions router/batchrouter/asyncdestinationmanager/eloqua/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ type EloquaService interface {
}

type EloquaBulkUploader struct {
destName string
logger logger.Logger
authorization string
baseEndpoint string
fileSizeLimit int64
eventsLimit int64
service EloquaService
jobToCSVMap map[int64]int64
destName string
logger logger.Logger
authorization string
baseEndpoint string
fileSizeLimit int64
eventsLimit int64
service EloquaService
jobToCSVMap map[int64]int64
uniqueKeys []string
successStatusCode []string
}
type DestinationConfig struct {
CompanyName string `json:"companyName"`
Expand Down
32 changes: 26 additions & 6 deletions router/batchrouter/asyncdestinationmanager/eloqua/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,12 @@ func createCSVFile(fields []string, file *os.File, uploadJobInfo *JobInfo, jobId
}
var values []string
for _, field := range fields {
values = append(values, data.Message.Data[field].(string))
val, ok := data.Message.Data[field].(string)
if !ok || val == "null" {
values = append(values, "")
} else {
values = append(values, val)
}
}
fileInfo, err := csvFile.Stat()
if err != nil {
Expand Down Expand Up @@ -158,12 +163,12 @@ func generateErrorString(item RejectedItem) string {
return item.StatusCode + " : " + item.Message + " " + invalidItems
}

func parseRejectedData(data *HttpRequestData, importingList []*jobsdb.JobT, service EloquaService, jobToCSVMap map[int64]int64) (*common.EventStatMeta, error) {
func parseRejectedData(data *HttpRequestData, importingList []*jobsdb.JobT, eloqua *EloquaBulkUploader) (*common.EventStatMeta, error) {
jobIDs := []int64{}
for _, job := range importingList {
jobIDs = append(jobIDs, job.JobID)
}
rejectResponse, err := service.CheckRejectedData(data)
rejectResponse, err := eloqua.service.CheckRejectedData(data)
if err != nil {
return nil, err
}
Expand All @@ -179,14 +184,19 @@ func parseRejectedData(data *HttpRequestData, importingList []*jobsdb.JobT, serv
offset = i * 1000
data.Offset = offset
if offset != 0 {
rejectResponse, err = service.CheckRejectedData(data)
rejectResponse, err = eloqua.service.CheckRejectedData(data)
if err != nil {
return nil, err
}
}
for _, val := range rejectResponse.Items {
failedJobIDs = append(failedJobIDs, jobToCSVMap[val.RecordIndex])
failedReasons[jobToCSVMap[val.RecordIndex]] = generateErrorString(val)
uniqueInvalidFields := lo.Intersect(eloqua.uniqueKeys, val.InvalidFields)
successStatusCode := lo.Intersect(eloqua.successStatusCode, []string{val.StatusCode})
if len(successStatusCode) != 0 && len(uniqueInvalidFields) == 0 {
continue
}
failedJobIDs = append(failedJobIDs, eloqua.jobToCSVMap[val.RecordIndex])
failedReasons[eloqua.jobToCSVMap[val.RecordIndex]] = generateErrorString(val)
}
}
}
Expand Down Expand Up @@ -214,3 +224,13 @@ func parseFailedData(syncId string, importingList []*jobsdb.JobT) *common.EventS
}
return &eventStatMeta
}

func getUniqueKeys(eloquaFields *Fields) []string {
uniqueKeys := []string{}
for _, item := range eloquaFields.Items {
if item.HasUniquenessConstraint {
uniqueKeys = append(uniqueKeys, item.InternalName)
}
}
return uniqueKeys
}

0 comments on commit e74cd7d

Please sign in to comment.