diff --git a/router/batchrouter/asyncdestinationmanager/marketo-bulk-upload/apiService.go b/router/batchrouter/asyncdestinationmanager/marketo-bulk-upload/apiService.go index a93eab7113..8d486d0fdb 100644 --- a/router/batchrouter/asyncdestinationmanager/marketo-bulk-upload/apiService.go +++ b/router/batchrouter/asyncdestinationmanager/marketo-bulk-upload/apiService.go @@ -93,6 +93,9 @@ func (m *MarketoAPIService) ImportLeads(csvFilePath, deduplicationField string) // If we get a token refresh error, retry once if apiError.Category == "RefreshToken" { + + fmt.Println("Token refresh required. Retrying import after fetching new token.") + m.logger.Info("Token refresh required. Retrying import after fetching new token.") time.Sleep(5 * time.Second) // Wait for 5 seconds before retrying return m.attemptImport(uploadURL, csvFilePath, deduplicationField, uploadTimeStat) diff --git a/router/batchrouter/asyncdestinationmanager/marketo-bulk-upload/authService.go b/router/batchrouter/asyncdestinationmanager/marketo-bulk-upload/authService.go index a2d89bcb76..00432f0cba 100644 --- a/router/batchrouter/asyncdestinationmanager/marketo-bulk-upload/authService.go +++ b/router/batchrouter/asyncdestinationmanager/marketo-bulk-upload/authService.go @@ -54,6 +54,7 @@ func (m *MarketoAuthService) fetchOrUpdateAccessToken() error { return err } + // 1657211 accessToken.FetchedAt = time.Now().Unix() m.accessToken = accessToken return nil @@ -62,19 +63,26 @@ func (m *MarketoAuthService) fetchOrUpdateAccessToken() error { func (m *MarketoAuthService) GetAccessToken() (string, error) { - if m.accessToken.AccessToken == "" { - err := m.fetchOrUpdateAccessToken() - if err != nil { - return "", err - } - - // If the access token is nil or about to expire in 10 seconds, wait 10 seconds and fetch a new access token - } else if m.accessToken.FetchedAt+m.accessToken.ExpiresIn < 10 { - time.Sleep(11 * time.Second) - err := m.fetchOrUpdateAccessToken() - if err != nil { - return "", err - } + // if m.accessToken.AccessToken == "" { + // err := m.fetchOrUpdateAccessToken() + // if err != nil { + // return "", err + // } + + // // If the access token is nil or about to expire in 10 seconds, wait 10 seconds and fetch a new access token + // } else if m.accessToken.FetchedAt+m.accessToken.ExpiresIn-int64(time.Second) < 10 { + // time.Sleep(11 * time.Second) + // err := m.fetchOrUpdateAccessToken() + // if err != nil { + // return "", err + // } + // } + + // leeping simple logic for now + err := m.fetchOrUpdateAccessToken() + if err != nil { + return "", err } + return m.accessToken.AccessToken, nil } diff --git a/router/batchrouter/asyncdestinationmanager/marketo-bulk-upload/marketobulkupload.go b/router/batchrouter/asyncdestinationmanager/marketo-bulk-upload/marketobulkupload.go index bc5a4f61ea..d0e238c093 100644 --- a/router/batchrouter/asyncdestinationmanager/marketo-bulk-upload/marketobulkupload.go +++ b/router/batchrouter/asyncdestinationmanager/marketo-bulk-upload/marketobulkupload.go @@ -99,6 +99,9 @@ func (b *MarketoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStr } payloadSizeStat.Gauge(float64(fileInfo.Size())) + fmt.Println("File Upload Started for Dest Type ", destType) + fmt.Println("File size: ", fileInfo.Size()) + importID, apiError := b.apiService.ImportLeads(csvFilePath, b.destinationConfig.DeduplicationField) b.logger.Debugf("[Async Destination Manager] File Upload Finished for Dest Type %v", destType) @@ -106,6 +109,9 @@ func (b *MarketoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStr if apiError != nil { if apiError.Category == "RefreshToken" { + + fmt.Println("Token Expired at Upload - ", apiError.Message) + return common.AsyncUploadOutput{ FailedJobIDs: append(failedJobIDs, importingJobIDs...), FailedReason: "BRT: Error in Uploading File: Token Expired " + apiError.Message, @@ -115,7 +121,7 @@ func (b *MarketoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStr } switch apiError.StatusCode { - case 500: + case 429, 500: return common.AsyncUploadOutput{ FailedJobIDs: append(failedJobIDs, importingJobIDs...), FailedReason: "BRT: Error in Uploading File: " + apiError.Message, @@ -129,13 +135,6 @@ func (b *MarketoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStr AbortCount: len(failedJobIDs) + len(importingJobIDs), DestinationID: destinationID, } - case 429: - return common.AsyncUploadOutput{ - FailedJobIDs: append(failedJobIDs, importingJobIDs...), - FailedReason: "BRT: Error in Uploading File: " + apiError.Message, - FailedCount: len(failedJobIDs) + len(importingJobIDs), - DestinationID: destinationID, - } } } diff --git a/router/batchrouter/asyncdestinationmanager/marketo-bulk-upload/utils.go b/router/batchrouter/asyncdestinationmanager/marketo-bulk-upload/utils.go index 5f1bed0ceb..7dfb7c9aa6 100644 --- a/router/batchrouter/asyncdestinationmanager/marketo-bulk-upload/utils.go +++ b/router/batchrouter/asyncdestinationmanager/marketo-bulk-upload/utils.go @@ -209,7 +209,7 @@ func calculateHashCode(data []string) string { func sendHTTPRequest(uploadURL, csvFilePath string, accessToken string, deduplicationField string) (*http.Response, error) { file, err := os.Open(csvFilePath) if err != nil { - return nil, err + return nil, fmt.Errorf("error while opening the file: %v", err) } defer file.Close() @@ -218,11 +218,11 @@ func sendHTTPRequest(uploadURL, csvFilePath string, accessToken string, deduplic part, err := writer.CreateFormFile("file", filepath.Base(csvFilePath)) if err != nil { - return nil, err + return nil, fmt.Errorf("error while creating form file: %v", err) } _, err = io.Copy(part, file) if err != nil { - return nil, err + return nil, fmt.Errorf("error while copying file: %v", err) } _ = writer.WriteField("format", "csv") @@ -238,7 +238,7 @@ func sendHTTPRequest(uploadURL, csvFilePath string, accessToken string, deduplic req, err := http.NewRequest("POST", uploadURL, body) if err != nil { - return nil, err + return nil, fmt.Errorf("error while creating request: %v", err) } req.Header.Set("Content-Type", writer.FormDataContentType())